Aller au contenu

Gestion des Erreurs & Fallbacks

Version: 1.0.0
Date: 11 Mars 2026
Statut: Production


Table des Matières

  1. Philosophie de Gestion d'Erreurs
  2. Fallback Chains
  3. Retry Policies
  4. Error Recovery
  5. Graceful Degradation
  6. Error Logging

1. Philosophie de Gestion d'Erreurs

1.1 Principes Fondamentaux

graph TB subgraph P1["NEVER FAIL SILENTLY"] P1A["Log all errors with context"] P1B["Rollback database transactions"] P1C["Return meaningful error messages"] P1D["Track errors for debugging"] end subgraph P2["ALWAYS HAVE FALLBACKS"] P2A["LLM → Regex → Empty"] P2B["Semantic chunking → Sentence splitting → Fixed size"] P2C["Voiceprint → RAG → LLM → 'Intervenant X'"] end subgraph P3["GRACEFUL DEGRADATION"] P3A["Continue with partial results"] P3B["User gets SOMETHING (not nothing)"] P3C["Mark degraded quality clearly"] end style P1 fill:#fee2e2,stroke:#dc2626,stroke-width:2px style P2 fill:#fef3c7,stroke:#f59e0b,stroke-width:2px style P3 fill:#d1fae5,stroke:#10b981,stroke-width:2px

1.2 Error Categories

Category Examples Impact Strategy
Transient Network timeout, DB connection Low Retry with exponential backoff
Client Error Invalid file format, missing field Medium Validate early, return 400
Server Error OOM, GPU unavailable High Fallback + alert + rollback
Data Quality Empty chunks, invalid embeddings Variable Degrade gracefully, log

2. Fallback Chains

2.1 LLM Metadata Extraction

graph LR A[Raw Text] --> B{LLM
GPT-4o-mini} B -->|Success| C[Structured Metadata] B -->|Timeout/Error| D{Regex
Patterns} D -->|Match| E[Basic Metadata] D -->|No Match| F[Empty Metadata] C --> G[Continue RAG] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff

Implémentation:

class MetadataExtractor:
    def __init__(self):
        self.llm_client = OpenAI()
        self.fallback_count = 0

    async def extract_metadata(
        self,
        text: str,
        max_retries: int = 3
    ) -> Dict[str, Any]:
        """
        3-tier fallback: LLM → Regex → Empty
        """
        # Tier 1: LLM Extraction (GPT-4o-mini)
        try:
            metadata = await self._extract_with_llm(text, max_retries)
            logger.info("DEBUG: Metadata extracted with LLM")
            return metadata

        except Exception as e:
            logger.warning(
                f"DEBUG: LLM extraction failed: {str(e)}, "
                f"falling back to regex"
            )
            self.fallback_count += 1

        # Tier 2: Regex Fallback
        try:
            metadata = self._extract_with_regex(text)
            logger.info("DEBUG: Metadata extracted with regex fallback")
            return metadata

        except Exception as e:
            logger.error(
                f"DEBUG: Regex extraction failed: {str(e)}, "
                f"returning empty metadata"
            )

        # Tier 3: Empty Metadata (Continue Processing)
        return {
            "participants": [],
            "project_context": {},
            "glossary": {},
            "extraction_method": "empty_fallback"
        }

    async def _extract_with_llm(
        self,
        text: str,
        max_retries: int
    ) -> Dict[str, Any]:
        """LLM extraction with retry."""
        for attempt in range(1, max_retries + 1):
            try:
                response = await self.llm_client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": METADATA_EXTRACTION_PROMPT},
                        {"role": "user", "content": text}
                    ],
                    temperature=0.0,
                    timeout=30.0
                )

                # Parse JSON response
                metadata = json.loads(response.choices[0].message.content)
                return metadata

            except openai.RateLimitError:
                if attempt < max_retries:
                    wait_time = 2 ** attempt  # Exponential backoff
                    logger.warning(
                        f"DEBUG: Rate limit hit, retrying in {wait_time}s "
                        f"(attempt {attempt}/{max_retries})"
                    )
                    await asyncio.sleep(wait_time)
                else:
                    raise

            except (openai.APITimeoutError, openai.APIConnectionError):
                if attempt < max_retries:
                    logger.warning(
                        f"DEBUG: Timeout/connection error, retrying "
                        f"(attempt {attempt}/{max_retries})"
                    )
                    await asyncio.sleep(1)
                else:
                    raise

    def _extract_with_regex(self, text: str) -> Dict[str, Any]:
        """Regex fallback for basic metadata."""
        participants = []

        # Pattern: "Name - Role" or "Name, Role"
        name_role_pattern = r"([A-Z][a-z]+(?: [A-Z][a-z]+)+)\s*[-,]\s*(.+?)(?:\n|$)"
        matches = re.findall(name_role_pattern, text)

        for name, role in matches:
            participants.append({
                "name": name.strip(),
                "role": role.strip(),
                "email": None,
                "phone": None
            })

        # Pattern: email
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        emails = re.findall(email_pattern, text)

        # Pattern: phone
        phone_pattern = r"(?:\+33|0)[1-9](?:\s?\d{2}){4}"
        phones = re.findall(phone_pattern, text)

        return {
            "participants": participants,
            "project_context": {},
            "glossary": {},
            "extraction_method": "regex_fallback",
            "confidence": 0.5,  # Lower confidence
            "emails_found": emails,
            "phones_found": phones
        }

Taux de Succès (Production):
- Tier 1 (LLM): 92% success
- Tier 2 (Regex): 6% success (fallback from LLM)
- Tier 3 (Empty): 2% (continue processing without metadata)

2.2 Text Chunking

graph LR A[Document Text] --> B{Semantic
Splitter} B -->|Success| C[Semantic Chunks] B -->|Error| D{Sentence
Splitter} D -->|Success| E[Sentence Chunks] D -->|Error| F[Fixed-Size Chunks] C --> G[Embed & Index] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff

Implémentation:

class ChunkingService:
    def chunk_text(
        self,
        text: str,
        max_chunk_size: int = 2000
    ) -> List[str]:
        """
        3-tier chunking: Semantic → Sentence → Fixed
        """
        # Tier 1: Semantic Chunking (LlamaIndex)
        try:
            splitter = SemanticSplitter(
                buffer_size=1,
                breakpoint_percentile_threshold=95,
                embed_model=self.embed_model
            )

            chunks = splitter.split_text(text)

            # Validate
            if chunks and all(len(c) <= max_chunk_size for c in chunks):
                logger.info(
                    f"DEBUG: Semantic chunking success - {len(chunks)} chunks"
                )
                return chunks

        except Exception as e:
            logger.warning(
                f"DEBUG: Semantic chunking failed: {str(e)}, "
                f"falling back to sentence splitting"
            )

        # Tier 2: Sentence Splitting (NLTK)
        try:
            sentences = nltk.sent_tokenize(text, language='french')

            # Group sentences into chunks
            chunks = []
            current_chunk = []
            current_length = 0

            for sentence in sentences:
                if current_length + len(sentence) > max_chunk_size:
                    if current_chunk:
                        chunks.append(" ".join(current_chunk))
                        current_chunk = []
                        current_length = 0

                current_chunk.append(sentence)
                current_length += len(sentence)

            if current_chunk:
                chunks.append(" ".join(current_chunk))

            logger.info(
                f"DEBUG: Sentence splitting success - {len(chunks)} chunks"
            )
            return chunks

        except Exception as e:
            logger.error(
                f"DEBUG: Sentence splitting failed: {str(e)}, "
                f"falling back to fixed-size chunks"
            )

        # Tier 3: Fixed-Size Chunks (Last Resort)
        chunks = [
            text[i:i + max_chunk_size]
            for i in range(0, len(text), max_chunk_size)
        ]

        logger.warning(
            f"DEBUG: Using fixed-size chunking (degraded quality) - "
            f"{len(chunks)} chunks"
        )

        return chunks

2.3 Speaker Identification

graph LR A[GPU Segment
SPEAKER_00] --> B{Priority 1:
Voiceprint} B -->|Match > 0.85| C[Identified] B -->|< 0.85| D{Priority 2:
RAG} D -->|Enriched| E[Identified
+ Metadata] D -->|No Match| F{Priority 3:
LLM} F -->|Confidence > 0.75| G[Identified
Pending Voiceprint] F -->|< 0.75| H[Intervenant X
Graceful Degradation] style H fill:#ef4444,stroke:#fff,color:#fff

3. Retry Policies

3.1 Exponential Backoff

class RetryPolicy:
    @staticmethod
    async def retry_with_backoff(
        func: Callable,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        exponential_base: int = 2,
        jitter: bool = True
    ):
        """
        Retry avec exponential backoff et jitter.

        Delays:
            Attempt 1: 1s
            Attempt 2: 2s
            Attempt 3: 4s

        Jitter: Random ±25% pour éviter thundering herd
        """
        for attempt in range(1, max_retries + 1):
            try:
                return await func()

            except Exception as e:
                if attempt == max_retries:
                    logger.error(
                        f"DEBUG: Max retries ({max_retries}) reached - {str(e)}"
                    )
                    raise

                # Calculate delay
                delay = min(
                    base_delay * (exponential_base ** (attempt - 1)),
                    max_delay
                )

                # Add jitter (±25%)
                if jitter:
                    jitter_amount = delay * 0.25
                    delay += random.uniform(-jitter_amount, jitter_amount)

                logger.warning(
                    f"DEBUG: Attempt {attempt}/{max_retries} failed - "
                    f"retrying in {delay:.2f}s - Error: {str(e)}"
                )

                await asyncio.sleep(delay)

3.2 Retry Stratégies par Service

Service Max Retries Base Delay Max Delay Jitter
OpenAI API 3 2s 30s Oui
PostgreSQL 5 1s 10s Non
Qdrant 3 2s 15s Oui
Redis Streams 3 1s 5s Non
S3 Upload 5 3s 60s Oui
MeetNoo GPU 0 - - - (timeout 30min)

3.3 Circuit Breaker

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: Exception = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED | OPEN | HALF_OPEN

    async def call(self, func: Callable):
        """
        Circuit breaker pattern.

        States:
            CLOSED: Normal operation
            OPEN: Too many failures, reject immediately
            HALF_OPEN: Testing if service recovered
        """
        # OPEN state: Reject immediately
        if self.state == "OPEN":
            if time.time() - self.last_failure_time < self.timeout:
                raise Exception("Circuit breaker OPEN - service unavailable")
            else:
                # Try to recover
                self.state = "HALF_OPEN"
                logger.info("DEBUG: Circuit breaker HALF_OPEN - testing recovery")

        try:
            result = await func()

            # Success: Reset
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                logger.info("DEBUG: Circuit breaker CLOSED - service recovered")

            self.failure_count = 0
            return result

        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            # Threshold reached: OPEN circuit
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.error(
                    f"DEBUG: Circuit breaker OPEN - "
                    f"{self.failure_count} failures"
                )

            raise

Usage:

# Protect OpenAI API
openai_breaker = CircuitBreaker(
    failure_threshold=5,
    timeout=60,
    expected_exception=openai.APIError
)

async def safe_llm_call(prompt: str):
    return await openai_breaker.call(
        lambda: llm_client.chat.completions.create(...)
    )


4. Error Recovery

4.1 Database Transaction Rollback

class DatabaseService:
    @staticmethod
    def safe_transaction(db: Session):
        """
        Context manager pour transactions sûres.

        Usage:
            with safe_transaction(db):
                db.add(record)
                db.commit()
        """
        try:
            yield db
            db.commit()

        except Exception as e:
            db.rollback()
            logger.error(
                f"DEBUG: Transaction rollback - Error: {str(e)}"
            )
            raise

        finally:
            db.close()

Usage:

def save_enriched_segments(
    segments: List[EnrichedSegment],
    db: Session
):
    """
    Bulk insert avec rollback automatique.
    """
    try:
        # Bulk insert
        db.bulk_save_objects(segments)
        db.commit()

        logger.info(
            f"DEBUG: Saved {len(segments)} enriched segments"
        )

    except Exception as e:
        db.rollback()

        logger.error(
            f"DEBUG: Failed to save segments - Error: {str(e)}, "
            f"rollback completed"
        )

        # Update transcript status to failed
        transcript = db.query(Transcript).get(segments[0].transcript_id)
        transcript.status = "failed"
        transcript.error_message = str(e)
        db.commit()

        raise HTTPException(
            status_code=500,
            detail=f"Database error: {str(e)}"
        )

4.2 Pipeline Stage Recovery

class PipelineOrchestrator:
    async def process_transcript(
        self,
        transcript_id: str,
        db: Session
    ):
        """
        Pipeline avec recovery automatique.
        """
        transcript = db.query(Transcript).get(transcript_id)

        # Checkpoint system
        stages = [
            ("rag_indexing", self._stage_rag_indexing),
            ("gpu_transcription", self._stage_gpu_transcription),
            ("post_processing", self._stage_post_processing),
            ("finalization", self._stage_finalization)
        ]

        for stage_name, stage_func in stages:
            try:
                # Update progress
                update_transcript_progress(
                    db, transcript_id,
                    progress=self._get_stage_progress(stage_name),
                    step=stage_name,
                    message=f"Processing {stage_name}..."
                )

                # Execute stage
                await stage_func(transcript_id, db)

                logger.info(
                    f"DEBUG: Stage '{stage_name}' completed - "
                    f"Transcript: {transcript_id}"
                )

            except Exception as e:
                logger.error(
                    f"DEBUG: Stage '{stage_name}' failed - "
                    f"Transcript: {transcript_id}, Error: {str(e)}"
                )

                # Mark transcript as failed
                transcript.status = "failed"
                transcript.current_step = stage_name
                transcript.error_message = str(e)
                db.commit()

                # Cleanup resources
                await self._cleanup_failed_transcript(transcript_id)

                # Alert admins
                await self._send_failure_alert(transcript_id, stage_name, str(e))

                raise

    async def _cleanup_failed_transcript(self, transcript_id: str):
        """
        Cleanup resources après échec.
        """
        # Delete temporary files
        temp_dir = f"/tmp/transcript_{transcript_id}"
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
            logger.info(f"DEBUG: Cleaned up temp dir: {temp_dir}")

        # Cancel MeetNoo task
        # (MeetNoo handles its own cleanup)

5. Graceful Degradation

5.1 Partial Results Strategy

class PostProcessingService:
    async def enrich_segments_tolerant(
        self,
        segments: List[Dict],
        db: Session,
        transcript_id: str
    ) -> Tuple[List[EnrichedSegment], List[str]]:
        """
        Enrichissement tolérant aux erreurs.

        Returns:
            (enriched_segments, errors)
        """
        enriched = []
        errors = []

        for segment in segments:
            try:
                # Try full enrichment
                enriched_segment = await self._enrich_single_segment(
                    segment, db, transcript_id
                )
                enriched.append(enriched_segment)

            except Exception as e:
                logger.warning(
                    f"DEBUG: Failed to enrich segment {segment['id']} - "
                    f"Error: {str(e)}, using degraded enrichment"
                )

                # Fallback: Basic enrichment
                try:
                    degraded_segment = self._create_degraded_segment(
                        segment, transcript_id
                    )
                    enriched.append(degraded_segment)
                    errors.append(f"Segment {segment['id']}: {str(e)}")

                except Exception as fatal_e:
                    logger.error(
                        f"DEBUG: Fatal error enriching segment {segment['id']} - "
                        f"Error: {str(fatal_e)}, skipping"
                    )
                    errors.append(
                        f"Segment {segment['id']}: FATAL - {str(fatal_e)}"
                    )

        # Save partial results
        if enriched:
            db.bulk_save_objects(enriched)
            db.commit()

            logger.info(
                f"DEBUG: Saved {len(enriched)}/{len(segments)} segments "
                f"({len(errors)} errors)"
            )

        return enriched, errors

    def _create_degraded_segment(
        self,
        segment: Dict,
        transcript_id: str
    ) -> EnrichedSegment:
        """
        Segment enrichi minimal (graceful degradation).
        """
        return EnrichedSegment(
            id=segment["id"],
            transcript_id=transcript_id,
            speaker_label=segment["speaker"],
            identified_name=None,  # Unknown
            match_source="unknown",
            start_time=segment["start_time"],
            end_time=segment["end_time"],
            transcription=segment["transcription"],

            # Empty RAG context
            rag_context={},

            # Metadata flag
            metadata={
                "degraded": True,
                "degradation_reason": "Enrichment failed"
            },

            created_at=unix_timestamp(),
            updated_at=unix_timestamp()
        )

5.2 UI Indicators

// Frontend: Show degraded quality
interface Segment {
  identified_name: string | null;
  metadata: {
    degraded?: boolean;
    degradation_reason?: string;
  };
}

function renderSegment(segment: Segment) {
  return (
    <div className="segment">
      <span className="speaker">
        {segment.identified_name || "Intervenant inconnu"}

        {segment.metadata.degraded && (
          <Tooltip content={segment.metadata.degradation_reason}>
            <Icon name="warning" className="text-yellow-500" />
          </Tooltip>
        )}
      </span>

      <p className="transcription">{segment.transcription}</p>
    </div>
  );
}

6. Error Logging

6.1 Structured Logging

import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

    def log_error(
        self,
        message: str,
        error: Exception,
        context: Dict[str, Any] = None
    ):
        """
        Log erreur structurée.
        """
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": "ERROR",
            "message": message,
            "error": {
                "type": type(error).__name__,
                "message": str(error),
                "traceback": traceback.format_exc()
            },
            "context": context or {}
        }

        self.logger.error(json.dumps(log_entry, ensure_ascii=False))

    def log_warning(
        self,
        message: str,
        context: Dict[str, Any] = None
    ):
        """Log warning structuré."""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": "WARNING",
            "message": message,
            "context": context or {}
        }

        self.logger.warning(json.dumps(log_entry, ensure_ascii=False))

Usage:

logger = StructuredLogger("smart_transcription")

try:
    result = await process_transcript(transcript_id, db)
except Exception as e:
    logger.log_error(
        "Transcript processing failed",
        error=e,
        context={
            "transcript_id": transcript_id,
            "user_id": user_id,
            "stage": "post_processing"
        }
    )
    raise

6.2 Error Monitoring

class ErrorTracker:
    def __init__(self, redis_client):
        self.redis = redis_client

    async def track_error(
        self,
        error_type: str,
        error_message: str,
        context: Dict[str, Any]
    ):
        """
        Track errors dans Redis pour monitoring.
        """
        error_key = f"errors:{error_type}:{datetime.utcnow().strftime('%Y-%m-%d')}"

        error_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "message": error_message,
            "context": json.dumps(context)
        }

        # Increment counter
        await self.redis.incr(f"{error_key}:count")

        # Store last 100 errors
        await self.redis.lpush(
            f"{error_key}:details",
            json.dumps(error_data)
        )
        await self.redis.ltrim(f"{error_key}:details", 0, 99)

        # Set expiry (7 days)
        await self.redis.expire(error_key, 7 * 24 * 3600)

Navigation: ← Data Models | Performance →