Architecture Decisions
ADR-002: Lambda Architecture

ADR-002: Lambda Function Architecture for AI Analysis Pipeline

Status

ACCEPTED - Implemented October 2025

Context

The Mind Measure platform requires sophisticated AI analysis of multi-modal data (audio, visual, text) to generate wellness scores. The original implementation used Supabase Edge Functions, which had limitations in processing power, execution time, and AI service integrations.

Requirements

  • Multi-modal Analysis: Process audio, visual, and text data simultaneously
  • Scalability: Handle variable loads from multiple universities
  • Performance: Complete analysis within 2 minutes for real-time feedback
  • Cost Efficiency: Pay-per-use model for variable workloads
  • AI Integration: Native integration with AWS AI/ML services
  • Reliability: Fault tolerance and graceful degradation

Constraints

  • Processing Time: Maximum 60 seconds per analysis function
  • Memory Limits: Optimize for cost while maintaining performance
  • Cold Start: Minimize cold start impact on user experience
  • Security: Secure handling of sensitive mental health data
  • Compliance: HIPAA-compliant processing and logging

Decision

We will implement a serverless Lambda function architecture with the following components:

  • analyze-audio: Voice pattern and emotional tone analysis
  • analyze-visual: Facial emotion recognition using AWS Rekognition
  • analyze-text: Natural language processing of conversation transcripts
  • calculate-mind-measure: Multi-modal fusion algorithm for final scoring

Architecture Design

Function Specifications

analyze-audio:
  Runtime: Node.js 18.x
  Memory: 1024 MB
  Timeout: 30 seconds
  Concurrency: 100
  Purpose: Process voice patterns, speech rate, emotional tone
 
analyze-visual:
  Runtime: Node.js 18.x
  Memory: 2048 MB
  Timeout: 60 seconds
  Concurrency: 50
  Purpose: Facial emotion recognition via AWS Rekognition
 
analyze-text:
  Runtime: Node.js 18.x
  Memory: 512 MB
  Timeout: 15 seconds
  Concurrency: 200
  Purpose: NLP analysis of conversation transcripts
 
calculate-mind-measure:
  Runtime: Node.js 18.x
  Memory: 1024 MB
  Timeout: 45 seconds
  Concurrency: 100
  Purpose: Multi-modal fusion scoring algorithm

Processing Pipeline

Assessment Session Creation

    Parallel Analysis
    ┌─────────────────┐
    │                 │
┌───▼────┐  ┌────▼────┐  ┌────▼────┐
│ Audio  │  │ Visual  │  │  Text   │
│Analysis│  │Analysis │  │Analysis │
│Lambda  │  │Lambda   │  │Lambda   │
└───┬────┘  └────┬────┘  └────┬────┘
    │            │            │
    └────────────┼────────────┘

        ┌─────────────────┐
        │ Fusion Algorithm│
        │     Lambda      │
        └─────────────────┘

        ┌─────────────────┐
        │ Score Storage   │
        │   (Database)    │
        └─────────────────┘

Rationale

1. Serverless Benefits

  • Auto-scaling: Automatic scaling based on demand
  • Cost Optimization: Pay only for actual execution time
  • No Infrastructure Management: Focus on business logic
  • High Availability: Built-in redundancy and fault tolerance

2. Function Separation

  • Parallel Processing: Independent analysis functions run simultaneously
  • Specialized Resources: Each function optimized for its specific task
  • Fault Isolation: Failure in one analysis doesn't affect others
  • Independent Scaling: Each function scales based on its own demand

3. AWS Integration

  • Rekognition: Native facial emotion analysis
  • Comprehend: Advanced NLP capabilities (future enhancement)
  • CloudWatch: Comprehensive monitoring and logging
  • X-Ray: Distributed tracing for performance optimization

4. Performance Optimization

  • Memory Allocation: Right-sized for each function's requirements
  • Connection Pooling: Reuse database connections across invocations
  • Caching: In-memory caching for frequently accessed data
  • Warm-up: Provisioned concurrency for critical functions

Implementation Details

Function Structure

// Standard Lambda function template
export const handler = async (event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> => {
  const startTime = Date.now();
  
  try {
    // 1. Authentication & Authorization
    const user = await verifyJWTToken(event.headers.Authorization);
    
    // 2. Input Validation
    const body = JSON.parse(event.body || '{}');
    validateInput(body);
    
    // 3. Business Logic
    const result = await processAnalysis(body);
    
    // 4. Metrics & Logging
    await recordMetrics('success', Date.now() - startTime);
    
    return {
      statusCode: 200,
      headers: corsHeaders,
      body: JSON.stringify(result)
    };
    
  } catch (error) {
    await recordMetrics('error', Date.now() - startTime);
    return handleError(error);
  }
};

Error Handling & Resilience

// Robust error handling with fallback
export async function processWithFallback<T>(
  primaryFunction: () => Promise<T>,
  fallbackFunction: () => Promise<T>,
  functionName: string
): Promise<T> {
  try {
    return await primaryFunction();
  } catch (error) {
    console.warn(`${functionName} failed, using fallback:`, error);
    return await fallbackFunction();
  }
}
 
// Circuit breaker pattern
class CircuitBreaker {
  private failures = 0;
  private lastFailureTime = 0;
  private readonly threshold = 5;
  private readonly timeout = 60000; // 1 minute
 
  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.isOpen()) {
      throw new Error('Circuit breaker is open');
    }
 
    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }
 
  private isOpen(): boolean {
    return this.failures >= this.threshold && 
           (Date.now() - this.lastFailureTime) < this.timeout;
  }
}

Database Integration

// Optimized database connection
class DatabaseConnection {
  private static pool: Pool;
 
  static getPool(): Pool {
    if (!this.pool) {
      this.pool = new Pool({
        host: process.env.RDS_HOST,
        port: parseInt(process.env.RDS_PORT || '5432'),
        database: process.env.RDS_DATABASE,
        user: process.env.RDS_USERNAME,
        password: process.env.RDS_PASSWORD,
        ssl: { rejectUnauthorized: false },
        max: 5, // Limit connections per Lambda
        idleTimeoutMillis: 30000,
        connectionTimeoutMillis: 10000,
      });
    }
    return this.pool;
  }
}

API Gateway Configuration

Endpoint Structure

Base URL: https://4xg1jsjh7k.execute-api.eu-west-2.amazonaws.com/dev
 
Endpoints:
  POST /analyze-audio:
    Integration: Lambda (analyze-audio)
    Auth: Cognito User Pool
    Rate Limit: 10 req/min per user
    
  POST /analyze-visual:
    Integration: Lambda (analyze-visual)
    Auth: Cognito User Pool
    Rate Limit: 5 req/min per user
    
  POST /analyze-text:
    Integration: Lambda (analyze-text)
    Auth: Cognito User Pool
    Rate Limit: 20 req/min per user
    
  POST /calculate-mind-measure:
    Integration: Lambda (calculate-mind-measure)
    Auth: Cognito User Pool
    Rate Limit: 10 req/min per user

CORS Configuration

{
  "Access-Control-Allow-Origin": "*",
  "Access-Control-Allow-Headers": "Content-Type,Authorization",
  "Access-Control-Allow-Methods": "POST,OPTIONS",
  "Access-Control-Max-Age": "86400"
}

Security Considerations

Authentication & Authorization

// JWT token verification
async function verifyJWTToken(authHeader: string): Promise<CognitoUser> {
  const token = authHeader?.replace('Bearer ', '');
  if (!token) {
    throw new UnauthorizedError('Missing authorization token');
  }
 
  return new Promise((resolve, reject) => {
    jwt.verify(token, getSigningKey, {
      audience: process.env.COGNITO_CLIENT_ID,
      issuer: `https://cognito-idp.${process.env.AWS_REGION}.amazonaws.com/${process.env.COGNITO_USER_POOL_ID}`,
      algorithms: ['RS256']
    }, (err, decoded) => {
      if (err) reject(new UnauthorizedError('Invalid token'));
      else resolve(decoded as CognitoUser);
    });
  });
}

Data Encryption

// Encrypt sensitive data before storage
import { KMSClient, EncryptCommand } from '@aws-sdk/client-kms';
 
const kmsClient = new KMSClient({ region: process.env.AWS_REGION });
 
async function encryptSensitiveData(data: any): Promise<string> {
  const command = new EncryptCommand({
    KeyId: process.env.KMS_KEY_ID,
    Plaintext: Buffer.from(JSON.stringify(data))
  });
  
  const result = await kmsClient.send(command);
  return Buffer.from(result.CiphertextBlob!).toString('base64');
}

Monitoring & Observability

CloudWatch Metrics

// Custom metrics
import { CloudWatchClient, PutMetricDataCommand } from '@aws-sdk/client-cloudwatch';
 
class MetricsService {
  private cloudWatch = new CloudWatchClient({ region: process.env.AWS_REGION });
 
  async recordAnalysisMetrics(functionName: string, duration: number, success: boolean) {
    const metrics = [
      {
        MetricName: 'Duration',
        Value: duration,
        Unit: 'Milliseconds',
        Dimensions: [{ Name: 'FunctionName', Value: functionName }]
      },
      {
        MetricName: success ? 'Success' : 'Error',
        Value: 1,
        Unit: 'Count',
        Dimensions: [{ Name: 'FunctionName', Value: functionName }]
      }
    ];
 
    await this.cloudWatch.send(new PutMetricDataCommand({
      Namespace: 'MindMeasure/Analysis',
      MetricData: metrics
    }));
  }
}

Distributed Tracing

// X-Ray tracing
import AWSXRay from 'aws-xray-sdk-core';
 
export const handler = AWSXRay.captureAsyncFunc('analyze-audio', async (event) => {
  const segment = AWSXRay.getSegment();
  
  segment?.addAnnotation('sessionId', event.body.sessionId);
  segment?.addMetadata('requestData', event.body);
  
  try {
    const result = await processAnalysis(event.body);
    segment?.addMetadata('result', result);
    return result;
  } catch (error) {
    segment?.addError(error);
    throw error;
  }
});

Performance Optimization

Cold Start Mitigation

// Global variables for connection reuse
let dbPool: Pool;
let rekognitionClient: RekognitionClient;
 
export const handler = async (event: APIGatewayProxyEvent) => {
  // Initialize connections outside handler for reuse
  if (!dbPool) {
    dbPool = new Pool(dbConfig);
  }
  
  if (!rekognitionClient) {
    rekognitionClient = new RekognitionClient({ region: process.env.AWS_REGION });
  }
  
  // Handler logic here
};

Provisioned Concurrency

# For critical functions with consistent traffic
ProvisionedConcurrency:
  calculate-mind-measure: 5 # Always keep 5 warm instances
  analyze-audio: 3          # Keep 3 warm instances

Memory Optimization

// Memory usage monitoring
process.on('beforeExit', () => {
  const memUsage = process.memoryUsage();
  console.log('Memory usage:', {
    rss: Math.round(memUsage.rss / 1024 / 1024) + 'MB',
    heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024) + 'MB',
    heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024) + 'MB'
  });
});

Cost Optimization

Function Sizing

# Optimized memory allocation based on profiling
analyze-audio:
  Memory: 1024 MB  # Optimal for audio processing
  Cost: ~$0.0000166667 per 100ms
 
analyze-visual:
  Memory: 2048 MB  # Required for image processing
  Cost: ~$0.0000333333 per 100ms
 
analyze-text:
  Memory: 512 MB   # Sufficient for text processing
  Cost: ~$0.0000083333 per 100ms
 
calculate-mind-measure:
  Memory: 1024 MB  # Optimal for fusion algorithm
  Cost: ~$0.0000166667 per 100ms

Execution Time Optimization

// Parallel processing within functions
async function processAnalysis(data: any) {
  const [audioResult, visualResult, textResult] = await Promise.all([
    processAudioData(data.audio),
    processVisualData(data.visual),
    processTextData(data.text)
  ]);
  
  return combineResults(audioResult, visualResult, textResult);
}

Testing Strategy

Unit Testing

// Lambda function unit tests
describe('analyze-audio Lambda', () => {
  test('processes valid audio data', async () => {
    const event = createMockEvent({
      sessionId: 'test-123',
      audioData: mockAudioData
    });
 
    const result = await handler(event);
    
    expect(result.statusCode).toBe(200);
    const body = JSON.parse(result.body);
    expect(body.analysis).toBeDefined();
    expect(body.sessionId).toBe('test-123');
  });
 
  test('handles missing sessionId', async () => {
    const event = createMockEvent({ audioData: mockAudioData });
 
    const result = await handler(event);
    
    expect(result.statusCode).toBe(400);
    const body = JSON.parse(result.body);
    expect(body.error).toBe('sessionId is required');
  });
});

Integration Testing

// End-to-end pipeline testing
describe('Analysis Pipeline Integration', () => {
  test('completes full analysis pipeline', async () => {
    const sessionId = await createTestSession();
    
    // Run parallel analysis
    const [audioResult, visualResult, textResult] = await Promise.all([
      invokeFunction('analyze-audio', { sessionId, audioData: mockData }),
      invokeFunction('analyze-visual', { sessionId, imageData: mockImage }),
      invokeFunction('analyze-text', { sessionId, conversationText: mockText })
    ]);
    
    // Calculate fusion score
    const fusionResult = await invokeFunction('calculate-mind-measure', { sessionId });
    
    expect(fusionResult.fusion_score.final_score).toBeGreaterThan(0);
    expect(fusionResult.fusion_score.final_score).toBeLessThanOrEqual(100);
  });
});

Deployment Configuration

Serverless Framework

# serverless.yml
service: mindmeasure-analysis-functions
 
provider:
  name: aws
  runtime: nodejs18.x
  region: eu-west-2
  stage: ${opt:stage, 'dev'}
  
  environment:
    RDS_HOST: ${env:RDS_HOST}
    COGNITO_USER_POOL_ID: ${env:COGNITO_USER_POOL_ID}
    KMS_KEY_ID: ${env:KMS_KEY_ID}
  
  iamRoleStatements:
    - Effect: Allow
      Action:
        - rds-data:*
        - rekognition:*
        - kms:Encrypt
        - kms:Decrypt
      Resource: "*"
 
functions:
  analyzeAudio:
    handler: src/analyze-audio.handler
    timeout: 30
    memorySize: 1024
    reservedConcurrency: 100
    events:
      - http:
          path: /analyze-audio
          method: post
          cors: true
          authorizer:
            type: COGNITO_USER_POOLS
            authorizerId: !Ref ApiGatewayAuthorizer
 
  analyzeVisual:
    handler: src/analyze-visual.handler
    timeout: 60
    memorySize: 2048
    reservedConcurrency: 50
    events:
      - http:
          path: /analyze-visual
          method: post
          cors: true
 
  analyzeText:
    handler: src/analyze-text.handler
    timeout: 15
    memorySize: 512
    reservedConcurrency: 200
    events:
      - http:
          path: /analyze-text
          method: post
          cors: true
 
  calculateMindMeasure:
    handler: src/calculate-mind-measure.handler
    timeout: 45
    memorySize: 1024
    reservedConcurrency: 100
    provisionedConcurrency: 5
    events:
      - http:
          path: /calculate-mind-measure
          method: post
          cors: true

Consequences

Positive

  • Scalability: Automatic scaling based on demand
  • Cost Efficiency: Pay-per-use model reduces costs during low usage
  • Performance: Parallel processing reduces total analysis time
  • Reliability: Built-in fault tolerance and retry mechanisms
  • Maintainability: Clear separation of concerns and single responsibility
  • Monitoring: Comprehensive observability with CloudWatch and X-Ray

Negative

  • Cold Starts: Initial invocation latency (mitigated with provisioned concurrency)
  • Complexity: More functions to manage and monitor
  • Debugging: Distributed system debugging is more complex
  • Vendor Lock-in: Increased dependency on AWS Lambda ecosystem

Neutral

  • Development: Similar development patterns with different deployment model
  • Testing: Requires additional integration testing for distributed functions

Success Metrics

Performance

  • Analysis Completion Time: < 90 seconds for 95% of assessments
  • Cold Start Impact: < 5% of invocations affected by cold starts
  • Error Rate: < 1% of function invocations result in errors
  • Concurrent Processing: Support 100+ simultaneous assessments

Cost

  • Cost per Analysis: < $0.10 per complete assessment
  • Idle Costs: $0 when no assessments are running
  • Scaling Efficiency: Linear cost scaling with usage

Reliability

  • Availability: > 99.9% uptime for analysis functions
  • Fault Tolerance: Graceful degradation when individual functions fail
  • Data Integrity: 100% data consistency across all analysis functions

Future Enhancements

Planned Improvements

  • ML Model Integration: Custom ML models for university-specific analysis
  • Real-time Processing: WebSocket integration for live analysis updates
  • Batch Processing: Support for bulk analysis of historical data
  • Advanced Analytics: Trend analysis and predictive modeling

Monitoring Enhancements

  • Custom Dashboards: Real-time analysis pipeline monitoring
  • Alerting: Proactive alerts for performance degradation
  • Cost Tracking: Detailed cost analysis per university
  • Performance Optimization: Continuous optimization based on metrics

Related ADRs


Author: Development Team
Date: October 20, 2025
Status: Accepted and Implemented
Last Updated: October 28, 2025