Files
Kiro/internal/storage/dynamodb.go
Daniel Romischer 9f07b0c6f9 Complete tasks 3.2-3.3: Data models and DynamoDB table schemas
- Defined all 8 data models (Page, Widget, Bookmark, Note, TagAssociation, Group, Share, Preferences)
- Implemented DynamoDB table creation for all tables with proper schemas
- Added GSIs for efficient querying (UserBookmarksIndex, UserNotesIndex, TagItemsIndex, UserSharesIndex)
- Comprehensive test coverage for all table schemas
- Updated init-db command to create all tables
2026-02-18 22:55:06 -05:00

759 lines
21 KiB
Go

package storage
import (
"context"
"fmt"
"math"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// DynamoDBClient wraps the AWS DynamoDB client with enhanced features
type DynamoDBClient struct {
client *dynamodb.Client
}
// NewDynamoDBClient creates a new DynamoDB client with connection pooling and retry logic
func NewDynamoDBClient(ctx context.Context, endpoint string) (*DynamoDBClient, error) {
// Configure retry strategy with exponential backoff
retryer := retry.NewStandard(func(o *retry.StandardOptions) {
o.MaxAttempts = 5
o.MaxBackoff = 20 * time.Second
// Use exponential backoff with jitter
o.Backoff = retry.NewExponentialJitterBackoff(20 * time.Second)
})
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRetryer(func() aws.Retryer {
return retryer
}),
// Connection pooling is handled by the HTTP client
config.WithHTTPClient(nil), // Uses default HTTP client with connection pooling
)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
// Override endpoint if provided (for local DynamoDB)
if endpoint != "" {
cfg.BaseEndpoint = aws.String(endpoint)
}
client := dynamodb.NewFromConfig(cfg)
return &DynamoDBClient{
client: client,
}, nil
}
// CreateUsersTable creates the Users table in DynamoDB
func (db *DynamoDBClient) CreateUsersTable(ctx context.Context) error {
tableName := "Users"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
// Table already exists
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash,
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Users table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60) // 5 minutes timeout
if err != nil {
return fmt.Errorf("failed waiting for Users table to be active: %w", err)
}
return nil
}
// CreatePagesTable creates the Pages table with composite key (user_id, page_id)
func (db *DynamoDBClient) CreatePagesTable(ctx context.Context) error {
tableName := "Pages"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("page_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("page_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Pages table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Pages table to be active: %w", err)
}
return nil
}
// CreateWidgetsTable creates the Widgets table with composite key (page_id, widget_id)
func (db *DynamoDBClient) CreateWidgetsTable(ctx context.Context) error {
tableName := "Widgets"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("page_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("widget_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("page_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("widget_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Widgets table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Widgets table to be active: %w", err)
}
return nil
}
// CreateBookmarksTable creates the Bookmarks table with UserBookmarksIndex GSI
func (db *DynamoDBClient) CreateBookmarksTable(ctx context.Context) error {
tableName := "Bookmarks"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("widget_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("bookmark_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("created_at"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("widget_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("bookmark_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("UserBookmarksIndex"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("created_at"),
KeyType: types.KeyTypeRange, // Sort key
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Bookmarks table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Bookmarks table to be active: %w", err)
}
return nil
}
// CreateNotesTable creates the Notes table with UserNotesIndex GSI
func (db *DynamoDBClient) CreateNotesTable(ctx context.Context) error {
tableName := "Notes"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("widget_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("note_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("created_at"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("widget_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("note_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("UserNotesIndex"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("created_at"),
KeyType: types.KeyTypeRange, // Sort key
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Notes table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Notes table to be active: %w", err)
}
return nil
}
// CreateTagAssociationsTable creates the TagAssociations table with TagItemsIndex GSI
func (db *DynamoDBClient) CreateTagAssociationsTable(ctx context.Context) error {
tableName := "TagAssociations"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("item_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("tag_name"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("item_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("tag_name"),
KeyType: types.KeyTypeRange, // Sort key
},
},
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("TagItemsIndex"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("tag_name"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeInclude,
NonKeyAttributes: []string{"item_id", "item_type", "created_at"},
},
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create TagAssociations table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for TagAssociations table to be active: %w", err)
}
return nil
}
// CreateGroupsTable creates the Groups table
func (db *DynamoDBClient) CreateGroupsTable(ctx context.Context) error {
tableName := "Groups"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("widget_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("group_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("widget_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("group_id"),
KeyType: types.KeyTypeRange, // Sort key
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Groups table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Groups table to be active: %w", err)
}
return nil
}
// CreateSharedItemsTable creates the SharedItems table with UserSharesIndex GSI
func (db *DynamoDBClient) CreateSharedItemsTable(ctx context.Context) error {
tableName := "SharedItems"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("share_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("created_at"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("share_id"),
KeyType: types.KeyTypeHash, // Partition key
},
},
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("UserSharesIndex"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash, // Partition key
},
{
AttributeName: aws.String("created_at"),
KeyType: types.KeyTypeRange, // Sort key
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeInclude,
NonKeyAttributes: []string{"share_id", "item_id", "item_type", "access_count"},
},
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create SharedItems table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for SharedItems table to be active: %w", err)
}
return nil
}
// CreatePreferencesTable creates the Preferences table
func (db *DynamoDBClient) CreatePreferencesTable(ctx context.Context) error {
tableName := "Preferences"
// Check if table already exists
_, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
return nil
}
// Create table
_, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("user_id"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("user_id"),
KeyType: types.KeyTypeHash, // Partition key
},
},
BillingMode: types.BillingModePayPerRequest,
})
if err != nil {
return fmt.Errorf("failed to create Preferences table: %w", err)
}
// Wait for table to be active
waiter := dynamodb.NewTableExistsWaiter(db.client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}, 5*60)
if err != nil {
return fmt.Errorf("failed waiting for Preferences table to be active: %w", err)
}
return nil
}
// GetClient returns the underlying DynamoDB client
func (db *DynamoDBClient) GetClient() *dynamodb.Client {
return db.client
}
// TransactWriteItems executes a transactional write operation with automatic retry
func (db *DynamoDBClient) TransactWriteItems(ctx context.Context, input *dynamodb.TransactWriteItemsInput) error {
_, err := db.client.TransactWriteItems(ctx, input)
if err != nil {
return fmt.Errorf("transaction write failed: %w", err)
}
return nil
}
// BatchGetItems retrieves multiple items in a single batch operation
func (db *DynamoDBClient) BatchGetItems(ctx context.Context, input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) {
output, err := db.client.BatchGetItem(ctx, input)
if err != nil {
return nil, fmt.Errorf("batch get failed: %w", err)
}
// Handle unprocessed keys with exponential backoff
if len(output.UnprocessedKeys) > 0 {
return db.retryUnprocessedKeys(ctx, output)
}
return output, nil
}
// BatchWriteItems writes multiple items in a single batch operation
func (db *DynamoDBClient) BatchWriteItems(ctx context.Context, input *dynamodb.BatchWriteItemInput) error {
output, err := db.client.BatchWriteItem(ctx, input)
if err != nil {
return fmt.Errorf("batch write failed: %w", err)
}
// Handle unprocessed items with exponential backoff
if len(output.UnprocessedItems) > 0 {
return db.retryUnprocessedWrites(ctx, output.UnprocessedItems)
}
return nil
}
// retryUnprocessedKeys retries unprocessed keys from BatchGetItem with exponential backoff
func (db *DynamoDBClient) retryUnprocessedKeys(ctx context.Context, output *dynamodb.BatchGetItemOutput) (*dynamodb.BatchGetItemOutput, error) {
maxRetries := 5
backoff := 100 * time.Millisecond
for attempt := 0; attempt < maxRetries && len(output.UnprocessedKeys) > 0; attempt++ {
// Wait with exponential backoff
time.Sleep(backoff)
backoff = time.Duration(math.Min(float64(backoff*2), float64(20*time.Second)))
// Retry unprocessed keys
retryOutput, err := db.client.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
RequestItems: output.UnprocessedKeys,
})
if err != nil {
return nil, fmt.Errorf("retry batch get failed: %w", err)
}
// Merge responses
for table, items := range retryOutput.Responses {
output.Responses[table] = append(output.Responses[table], items...)
}
output.UnprocessedKeys = retryOutput.UnprocessedKeys
}
if len(output.UnprocessedKeys) > 0 {
return output, fmt.Errorf("failed to process all keys after %d retries", maxRetries)
}
return output, nil
}
// retryUnprocessedWrites retries unprocessed items from BatchWriteItem with exponential backoff
func (db *DynamoDBClient) retryUnprocessedWrites(ctx context.Context, unprocessedItems map[string][]types.WriteRequest) error {
maxRetries := 5
backoff := 100 * time.Millisecond
for attempt := 0; attempt < maxRetries && len(unprocessedItems) > 0; attempt++ {
// Wait with exponential backoff
time.Sleep(backoff)
backoff = time.Duration(math.Min(float64(backoff*2), float64(20*time.Second)))
// Retry unprocessed items
output, err := db.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: unprocessedItems,
})
if err != nil {
return fmt.Errorf("retry batch write failed: %w", err)
}
unprocessedItems = output.UnprocessedItems
}
if len(unprocessedItems) > 0 {
return fmt.Errorf("failed to process all items after %d retries", maxRetries)
}
return nil
}
// PutItem puts a single item with automatic retry
func (db *DynamoDBClient) PutItem(ctx context.Context, input *dynamodb.PutItemInput) error {
_, err := db.client.PutItem(ctx, input)
if err != nil {
return fmt.Errorf("put item failed: %w", err)
}
return nil
}
// GetItem retrieves a single item with automatic retry
func (db *DynamoDBClient) GetItem(ctx context.Context, input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
output, err := db.client.GetItem(ctx, input)
if err != nil {
return nil, fmt.Errorf("get item failed: %w", err)
}
return output, nil
}
// Query executes a query operation with automatic retry
func (db *DynamoDBClient) Query(ctx context.Context, input *dynamodb.QueryInput) (*dynamodb.QueryOutput, error) {
output, err := db.client.Query(ctx, input)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
return output, nil
}
// UpdateItem updates a single item with automatic retry
func (db *DynamoDBClient) UpdateItem(ctx context.Context, input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) {
output, err := db.client.UpdateItem(ctx, input)
if err != nil {
return nil, fmt.Errorf("update item failed: %w", err)
}
return output, nil
}
// DeleteItem deletes a single item with automatic retry
func (db *DynamoDBClient) DeleteItem(ctx context.Context, input *dynamodb.DeleteItemInput) error {
_, err := db.client.DeleteItem(ctx, input)
if err != nil {
return fmt.Errorf("delete item failed: %w", err)
}
return nil
}