package storage import ( "context" "fmt" "math" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) // DynamoDBClient wraps the AWS DynamoDB client with enhanced features type DynamoDBClient struct { client *dynamodb.Client } // NewDynamoDBClient creates a new DynamoDB client with connection pooling and retry logic func NewDynamoDBClient(ctx context.Context, endpoint string) (*DynamoDBClient, error) { // Configure retry strategy with exponential backoff retryer := retry.NewStandard(func(o *retry.StandardOptions) { o.MaxAttempts = 5 o.MaxBackoff = 20 * time.Second // Use exponential backoff with jitter o.Backoff = retry.NewExponentialJitterBackoff(20 * time.Second) }) cfg, err := config.LoadDefaultConfig(ctx, config.WithRetryer(func() aws.Retryer { return retryer }), // Connection pooling is handled by the HTTP client config.WithHTTPClient(nil), // Uses default HTTP client with connection pooling ) if err != nil { return nil, fmt.Errorf("failed to load AWS config: %w", err) } // Override endpoint if provided (for local DynamoDB) if endpoint != "" { cfg.BaseEndpoint = aws.String(endpoint) } client := dynamodb.NewFromConfig(cfg) return &DynamoDBClient{ client: client, }, nil } // CreateUsersTable creates the Users table in DynamoDB func (db *DynamoDBClient) CreateUsersTable(ctx context.Context) error { tableName := "Users" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { // Table already exists return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Users table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) // 5 minutes timeout if err != nil { return fmt.Errorf("failed waiting for Users table to be active: %w", err) } return nil } // CreatePagesTable creates the Pages table with composite key (user_id, page_id) func (db *DynamoDBClient) CreatePagesTable(ctx context.Context) error { tableName := "Pages" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("page_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("page_id"), KeyType: types.KeyTypeRange, // Sort key }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Pages table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Pages table to be active: %w", err) } return nil } // CreateWidgetsTable creates the Widgets table with composite key (page_id, widget_id) func (db *DynamoDBClient) CreateWidgetsTable(ctx context.Context) error { tableName := "Widgets" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("page_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("widget_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("page_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("widget_id"), KeyType: types.KeyTypeRange, // Sort key }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Widgets table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Widgets table to be active: %w", err) } return nil } // CreateBookmarksTable creates the Bookmarks table with UserBookmarksIndex GSI func (db *DynamoDBClient) CreateBookmarksTable(ctx context.Context) error { tableName := "Bookmarks" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("widget_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("bookmark_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("created_at"), AttributeType: types.ScalarAttributeTypeN, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("widget_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("bookmark_id"), KeyType: types.KeyTypeRange, // Sort key }, }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { IndexName: aws.String("UserBookmarksIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("created_at"), KeyType: types.KeyTypeRange, // Sort key }, }, Projection: &types.Projection{ ProjectionType: types.ProjectionTypeAll, }, }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Bookmarks table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Bookmarks table to be active: %w", err) } return nil } // CreateNotesTable creates the Notes table with UserNotesIndex GSI func (db *DynamoDBClient) CreateNotesTable(ctx context.Context) error { tableName := "Notes" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("widget_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("note_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("created_at"), AttributeType: types.ScalarAttributeTypeN, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("widget_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("note_id"), KeyType: types.KeyTypeRange, // Sort key }, }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { IndexName: aws.String("UserNotesIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("created_at"), KeyType: types.KeyTypeRange, // Sort key }, }, Projection: &types.Projection{ ProjectionType: types.ProjectionTypeAll, }, }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Notes table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Notes table to be active: %w", err) } return nil } // CreateTagAssociationsTable creates the TagAssociations table with TagItemsIndex GSI func (db *DynamoDBClient) CreateTagAssociationsTable(ctx context.Context) error { tableName := "TagAssociations" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("item_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("tag_name"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("item_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("tag_name"), KeyType: types.KeyTypeRange, // Sort key }, }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { IndexName: aws.String("TagItemsIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("tag_name"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeRange, // Sort key }, }, Projection: &types.Projection{ ProjectionType: types.ProjectionTypeInclude, NonKeyAttributes: []string{"item_id", "item_type", "created_at"}, }, }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create TagAssociations table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for TagAssociations table to be active: %w", err) } return nil } // CreateGroupsTable creates the Groups table func (db *DynamoDBClient) CreateGroupsTable(ctx context.Context) error { tableName := "Groups" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("widget_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("group_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("widget_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("group_id"), KeyType: types.KeyTypeRange, // Sort key }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Groups table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Groups table to be active: %w", err) } return nil } // CreateSharedItemsTable creates the SharedItems table with UserSharesIndex GSI func (db *DynamoDBClient) CreateSharedItemsTable(ctx context.Context) error { tableName := "SharedItems" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("share_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("created_at"), AttributeType: types.ScalarAttributeTypeN, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("share_id"), KeyType: types.KeyTypeHash, // Partition key }, }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { IndexName: aws.String("UserSharesIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, // Partition key }, { AttributeName: aws.String("created_at"), KeyType: types.KeyTypeRange, // Sort key }, }, Projection: &types.Projection{ ProjectionType: types.ProjectionTypeInclude, NonKeyAttributes: []string{"share_id", "item_id", "item_type", "access_count"}, }, }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create SharedItems table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for SharedItems table to be active: %w", err) } return nil } // CreatePreferencesTable creates the Preferences table func (db *DynamoDBClient) CreatePreferencesTable(ctx context.Context) error { tableName := "Preferences" // Check if table already exists _, err := db.client.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err == nil { return nil } // Create table _, err = db.client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String("user_id"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("user_id"), KeyType: types.KeyTypeHash, // Partition key }, }, BillingMode: types.BillingModePayPerRequest, }) if err != nil { return fmt.Errorf("failed to create Preferences table: %w", err) } // Wait for table to be active waiter := dynamodb.NewTableExistsWaiter(db.client) err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }, 5*60) if err != nil { return fmt.Errorf("failed waiting for Preferences table to be active: %w", err) } return nil } // GetClient returns the underlying DynamoDB client func (db *DynamoDBClient) GetClient() *dynamodb.Client { return db.client } // TransactWriteItems executes a transactional write operation with automatic retry func (db *DynamoDBClient) TransactWriteItems(ctx context.Context, input *dynamodb.TransactWriteItemsInput) error { _, err := db.client.TransactWriteItems(ctx, input) if err != nil { return fmt.Errorf("transaction write failed: %w", err) } return nil } // BatchGetItems retrieves multiple items in a single batch operation func (db *DynamoDBClient) BatchGetItems(ctx context.Context, input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) { output, err := db.client.BatchGetItem(ctx, input) if err != nil { return nil, fmt.Errorf("batch get failed: %w", err) } // Handle unprocessed keys with exponential backoff if len(output.UnprocessedKeys) > 0 { return db.retryUnprocessedKeys(ctx, output) } return output, nil } // BatchWriteItems writes multiple items in a single batch operation func (db *DynamoDBClient) BatchWriteItems(ctx context.Context, input *dynamodb.BatchWriteItemInput) error { output, err := db.client.BatchWriteItem(ctx, input) if err != nil { return fmt.Errorf("batch write failed: %w", err) } // Handle unprocessed items with exponential backoff if len(output.UnprocessedItems) > 0 { return db.retryUnprocessedWrites(ctx, output.UnprocessedItems) } return nil } // retryUnprocessedKeys retries unprocessed keys from BatchGetItem with exponential backoff func (db *DynamoDBClient) retryUnprocessedKeys(ctx context.Context, output *dynamodb.BatchGetItemOutput) (*dynamodb.BatchGetItemOutput, error) { maxRetries := 5 backoff := 100 * time.Millisecond for attempt := 0; attempt < maxRetries && len(output.UnprocessedKeys) > 0; attempt++ { // Wait with exponential backoff time.Sleep(backoff) backoff = time.Duration(math.Min(float64(backoff*2), float64(20*time.Second))) // Retry unprocessed keys retryOutput, err := db.client.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{ RequestItems: output.UnprocessedKeys, }) if err != nil { return nil, fmt.Errorf("retry batch get failed: %w", err) } // Merge responses for table, items := range retryOutput.Responses { output.Responses[table] = append(output.Responses[table], items...) } output.UnprocessedKeys = retryOutput.UnprocessedKeys } if len(output.UnprocessedKeys) > 0 { return output, fmt.Errorf("failed to process all keys after %d retries", maxRetries) } return output, nil } // retryUnprocessedWrites retries unprocessed items from BatchWriteItem with exponential backoff func (db *DynamoDBClient) retryUnprocessedWrites(ctx context.Context, unprocessedItems map[string][]types.WriteRequest) error { maxRetries := 5 backoff := 100 * time.Millisecond for attempt := 0; attempt < maxRetries && len(unprocessedItems) > 0; attempt++ { // Wait with exponential backoff time.Sleep(backoff) backoff = time.Duration(math.Min(float64(backoff*2), float64(20*time.Second))) // Retry unprocessed items output, err := db.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ RequestItems: unprocessedItems, }) if err != nil { return fmt.Errorf("retry batch write failed: %w", err) } unprocessedItems = output.UnprocessedItems } if len(unprocessedItems) > 0 { return fmt.Errorf("failed to process all items after %d retries", maxRetries) } return nil } // PutItem puts a single item with automatic retry func (db *DynamoDBClient) PutItem(ctx context.Context, input *dynamodb.PutItemInput) error { _, err := db.client.PutItem(ctx, input) if err != nil { return fmt.Errorf("put item failed: %w", err) } return nil } // GetItem retrieves a single item with automatic retry func (db *DynamoDBClient) GetItem(ctx context.Context, input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { output, err := db.client.GetItem(ctx, input) if err != nil { return nil, fmt.Errorf("get item failed: %w", err) } return output, nil } // Query executes a query operation with automatic retry func (db *DynamoDBClient) Query(ctx context.Context, input *dynamodb.QueryInput) (*dynamodb.QueryOutput, error) { output, err := db.client.Query(ctx, input) if err != nil { return nil, fmt.Errorf("query failed: %w", err) } return output, nil } // UpdateItem updates a single item with automatic retry func (db *DynamoDBClient) UpdateItem(ctx context.Context, input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { output, err := db.client.UpdateItem(ctx, input) if err != nil { return nil, fmt.Errorf("update item failed: %w", err) } return output, nil } // DeleteItem deletes a single item with automatic retry func (db *DynamoDBClient) DeleteItem(ctx context.Context, input *dynamodb.DeleteItemInput) error { _, err := db.client.DeleteItem(ctx, input) if err != nil { return fmt.Errorf("delete item failed: %w", err) } return nil }