Gestion des Erreurs & Fallbacks¶
Version: 1.0.0
Date: 11 Mars 2026
Statut: Production
Table des Matières¶
- Philosophie de Gestion d'Erreurs
- Fallback Chains
- Retry Policies
- Error Recovery
- Graceful Degradation
- Error Logging
1. Philosophie de Gestion d'Erreurs¶
1.1 Principes Fondamentaux¶
graph TB
subgraph P1["NEVER FAIL SILENTLY"]
P1A["Log all errors with context"]
P1B["Rollback database transactions"]
P1C["Return meaningful error messages"]
P1D["Track errors for debugging"]
end
subgraph P2["ALWAYS HAVE FALLBACKS"]
P2A["LLM → Regex → Empty"]
P2B["Semantic chunking → Sentence splitting → Fixed size"]
P2C["Voiceprint → RAG → LLM → 'Intervenant X'"]
end
subgraph P3["GRACEFUL DEGRADATION"]
P3A["Continue with partial results"]
P3B["User gets SOMETHING (not nothing)"]
P3C["Mark degraded quality clearly"]
end
style P1 fill:#fee2e2,stroke:#dc2626,stroke-width:2px
style P2 fill:#fef3c7,stroke:#f59e0b,stroke-width:2px
style P3 fill:#d1fae5,stroke:#10b981,stroke-width:2px
1.2 Error Categories¶
| Category | Examples | Impact | Strategy |
|---|---|---|---|
| Transient | Network timeout, DB connection | Low | Retry with exponential backoff |
| Client Error | Invalid file format, missing field | Medium | Validate early, return 400 |
| Server Error | OOM, GPU unavailable | High | Fallback + alert + rollback |
| Data Quality | Empty chunks, invalid embeddings | Variable | Degrade gracefully, log |
2. Fallback Chains¶
2.1 LLM Metadata Extraction¶
graph LR
A[Raw Text] --> B{LLM
GPT-4o-mini} B -->|Success| C[Structured Metadata] B -->|Timeout/Error| D{Regex
Patterns} D -->|Match| E[Basic Metadata] D -->|No Match| F[Empty Metadata] C --> G[Continue RAG] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff
GPT-4o-mini} B -->|Success| C[Structured Metadata] B -->|Timeout/Error| D{Regex
Patterns} D -->|Match| E[Basic Metadata] D -->|No Match| F[Empty Metadata] C --> G[Continue RAG] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff
Implémentation:
class MetadataExtractor:
def __init__(self):
self.llm_client = OpenAI()
self.fallback_count = 0
async def extract_metadata(
self,
text: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""
3-tier fallback: LLM → Regex → Empty
"""
# Tier 1: LLM Extraction (GPT-4o-mini)
try:
metadata = await self._extract_with_llm(text, max_retries)
logger.info("DEBUG: Metadata extracted with LLM")
return metadata
except Exception as e:
logger.warning(
f"DEBUG: LLM extraction failed: {str(e)}, "
f"falling back to regex"
)
self.fallback_count += 1
# Tier 2: Regex Fallback
try:
metadata = self._extract_with_regex(text)
logger.info("DEBUG: Metadata extracted with regex fallback")
return metadata
except Exception as e:
logger.error(
f"DEBUG: Regex extraction failed: {str(e)}, "
f"returning empty metadata"
)
# Tier 3: Empty Metadata (Continue Processing)
return {
"participants": [],
"project_context": {},
"glossary": {},
"extraction_method": "empty_fallback"
}
async def _extract_with_llm(
self,
text: str,
max_retries: int
) -> Dict[str, Any]:
"""LLM extraction with retry."""
for attempt in range(1, max_retries + 1):
try:
response = await self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": METADATA_EXTRACTION_PROMPT},
{"role": "user", "content": text}
],
temperature=0.0,
timeout=30.0
)
# Parse JSON response
metadata = json.loads(response.choices[0].message.content)
return metadata
except openai.RateLimitError:
if attempt < max_retries:
wait_time = 2 ** attempt # Exponential backoff
logger.warning(
f"DEBUG: Rate limit hit, retrying in {wait_time}s "
f"(attempt {attempt}/{max_retries})"
)
await asyncio.sleep(wait_time)
else:
raise
except (openai.APITimeoutError, openai.APIConnectionError):
if attempt < max_retries:
logger.warning(
f"DEBUG: Timeout/connection error, retrying "
f"(attempt {attempt}/{max_retries})"
)
await asyncio.sleep(1)
else:
raise
def _extract_with_regex(self, text: str) -> Dict[str, Any]:
"""Regex fallback for basic metadata."""
participants = []
# Pattern: "Name - Role" or "Name, Role"
name_role_pattern = r"([A-Z][a-z]+(?: [A-Z][a-z]+)+)\s*[-,]\s*(.+?)(?:\n|$)"
matches = re.findall(name_role_pattern, text)
for name, role in matches:
participants.append({
"name": name.strip(),
"role": role.strip(),
"email": None,
"phone": None
})
# Pattern: email
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
emails = re.findall(email_pattern, text)
# Pattern: phone
phone_pattern = r"(?:\+33|0)[1-9](?:\s?\d{2}){4}"
phones = re.findall(phone_pattern, text)
return {
"participants": participants,
"project_context": {},
"glossary": {},
"extraction_method": "regex_fallback",
"confidence": 0.5, # Lower confidence
"emails_found": emails,
"phones_found": phones
}
Taux de Succès (Production):
- Tier 1 (LLM): 92% success
- Tier 2 (Regex): 6% success (fallback from LLM)
- Tier 3 (Empty): 2% (continue processing without metadata)
2.2 Text Chunking¶
graph LR
A[Document Text] --> B{Semantic
Splitter} B -->|Success| C[Semantic Chunks] B -->|Error| D{Sentence
Splitter} D -->|Success| E[Sentence Chunks] D -->|Error| F[Fixed-Size Chunks] C --> G[Embed & Index] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff
Splitter} B -->|Success| C[Semantic Chunks] B -->|Error| D{Sentence
Splitter} D -->|Success| E[Sentence Chunks] D -->|Error| F[Fixed-Size Chunks] C --> G[Embed & Index] E --> G F --> G style B fill:#10b981,stroke:#fff,color:#fff style D fill:#f59e0b,stroke:#fff,color:#fff style F fill:#ef4444,stroke:#fff,color:#fff
Implémentation:
class ChunkingService:
def chunk_text(
self,
text: str,
max_chunk_size: int = 2000
) -> List[str]:
"""
3-tier chunking: Semantic → Sentence → Fixed
"""
# Tier 1: Semantic Chunking (LlamaIndex)
try:
splitter = SemanticSplitter(
buffer_size=1,
breakpoint_percentile_threshold=95,
embed_model=self.embed_model
)
chunks = splitter.split_text(text)
# Validate
if chunks and all(len(c) <= max_chunk_size for c in chunks):
logger.info(
f"DEBUG: Semantic chunking success - {len(chunks)} chunks"
)
return chunks
except Exception as e:
logger.warning(
f"DEBUG: Semantic chunking failed: {str(e)}, "
f"falling back to sentence splitting"
)
# Tier 2: Sentence Splitting (NLTK)
try:
sentences = nltk.sent_tokenize(text, language='french')
# Group sentences into chunks
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
if current_length + len(sentence) > max_chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_length = 0
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(" ".join(current_chunk))
logger.info(
f"DEBUG: Sentence splitting success - {len(chunks)} chunks"
)
return chunks
except Exception as e:
logger.error(
f"DEBUG: Sentence splitting failed: {str(e)}, "
f"falling back to fixed-size chunks"
)
# Tier 3: Fixed-Size Chunks (Last Resort)
chunks = [
text[i:i + max_chunk_size]
for i in range(0, len(text), max_chunk_size)
]
logger.warning(
f"DEBUG: Using fixed-size chunking (degraded quality) - "
f"{len(chunks)} chunks"
)
return chunks
2.3 Speaker Identification¶
graph LR
A[GPU Segment
SPEAKER_00] --> B{Priority 1:
Voiceprint} B -->|Match > 0.85| C[Identified] B -->|< 0.85| D{Priority 2:
RAG} D -->|Enriched| E[Identified
+ Metadata] D -->|No Match| F{Priority 3:
LLM} F -->|Confidence > 0.75| G[Identified
Pending Voiceprint] F -->|< 0.75| H[Intervenant X
Graceful Degradation] style H fill:#ef4444,stroke:#fff,color:#fff
SPEAKER_00] --> B{Priority 1:
Voiceprint} B -->|Match > 0.85| C[Identified] B -->|< 0.85| D{Priority 2:
RAG} D -->|Enriched| E[Identified
+ Metadata] D -->|No Match| F{Priority 3:
LLM} F -->|Confidence > 0.75| G[Identified
Pending Voiceprint] F -->|< 0.75| H[Intervenant X
Graceful Degradation] style H fill:#ef4444,stroke:#fff,color:#fff
3. Retry Policies¶
3.1 Exponential Backoff¶
class RetryPolicy:
@staticmethod
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: int = 2,
jitter: bool = True
):
"""
Retry avec exponential backoff et jitter.
Delays:
Attempt 1: 1s
Attempt 2: 2s
Attempt 3: 4s
Jitter: Random ±25% pour éviter thundering herd
"""
for attempt in range(1, max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
logger.error(
f"DEBUG: Max retries ({max_retries}) reached - {str(e)}"
)
raise
# Calculate delay
delay = min(
base_delay * (exponential_base ** (attempt - 1)),
max_delay
)
# Add jitter (±25%)
if jitter:
jitter_amount = delay * 0.25
delay += random.uniform(-jitter_amount, jitter_amount)
logger.warning(
f"DEBUG: Attempt {attempt}/{max_retries} failed - "
f"retrying in {delay:.2f}s - Error: {str(e)}"
)
await asyncio.sleep(delay)
3.2 Retry Stratégies par Service¶
| Service | Max Retries | Base Delay | Max Delay | Jitter |
|---|---|---|---|---|
| OpenAI API | 3 | 2s | 30s | Oui |
| PostgreSQL | 5 | 1s | 10s | Non |
| Qdrant | 3 | 2s | 15s | Oui |
| Redis Streams | 3 | 1s | 5s | Non |
| S3 Upload | 5 | 3s | 60s | Oui |
| MeetNoo GPU | 0 | - | - | - (timeout 30min) |
3.3 Circuit Breaker¶
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED | OPEN | HALF_OPEN
async def call(self, func: Callable):
"""
Circuit breaker pattern.
States:
CLOSED: Normal operation
OPEN: Too many failures, reject immediately
HALF_OPEN: Testing if service recovered
"""
# OPEN state: Reject immediately
if self.state == "OPEN":
if time.time() - self.last_failure_time < self.timeout:
raise Exception("Circuit breaker OPEN - service unavailable")
else:
# Try to recover
self.state = "HALF_OPEN"
logger.info("DEBUG: Circuit breaker HALF_OPEN - testing recovery")
try:
result = await func()
# Success: Reset
if self.state == "HALF_OPEN":
self.state = "CLOSED"
logger.info("DEBUG: Circuit breaker CLOSED - service recovered")
self.failure_count = 0
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
# Threshold reached: OPEN circuit
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.error(
f"DEBUG: Circuit breaker OPEN - "
f"{self.failure_count} failures"
)
raise
Usage:
# Protect OpenAI API
openai_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60,
expected_exception=openai.APIError
)
async def safe_llm_call(prompt: str):
return await openai_breaker.call(
lambda: llm_client.chat.completions.create(...)
)
4. Error Recovery¶
4.1 Database Transaction Rollback¶
class DatabaseService:
@staticmethod
def safe_transaction(db: Session):
"""
Context manager pour transactions sûres.
Usage:
with safe_transaction(db):
db.add(record)
db.commit()
"""
try:
yield db
db.commit()
except Exception as e:
db.rollback()
logger.error(
f"DEBUG: Transaction rollback - Error: {str(e)}"
)
raise
finally:
db.close()
Usage:
def save_enriched_segments(
segments: List[EnrichedSegment],
db: Session
):
"""
Bulk insert avec rollback automatique.
"""
try:
# Bulk insert
db.bulk_save_objects(segments)
db.commit()
logger.info(
f"DEBUG: Saved {len(segments)} enriched segments"
)
except Exception as e:
db.rollback()
logger.error(
f"DEBUG: Failed to save segments - Error: {str(e)}, "
f"rollback completed"
)
# Update transcript status to failed
transcript = db.query(Transcript).get(segments[0].transcript_id)
transcript.status = "failed"
transcript.error_message = str(e)
db.commit()
raise HTTPException(
status_code=500,
detail=f"Database error: {str(e)}"
)
4.2 Pipeline Stage Recovery¶
class PipelineOrchestrator:
async def process_transcript(
self,
transcript_id: str,
db: Session
):
"""
Pipeline avec recovery automatique.
"""
transcript = db.query(Transcript).get(transcript_id)
# Checkpoint system
stages = [
("rag_indexing", self._stage_rag_indexing),
("gpu_transcription", self._stage_gpu_transcription),
("post_processing", self._stage_post_processing),
("finalization", self._stage_finalization)
]
for stage_name, stage_func in stages:
try:
# Update progress
update_transcript_progress(
db, transcript_id,
progress=self._get_stage_progress(stage_name),
step=stage_name,
message=f"Processing {stage_name}..."
)
# Execute stage
await stage_func(transcript_id, db)
logger.info(
f"DEBUG: Stage '{stage_name}' completed - "
f"Transcript: {transcript_id}"
)
except Exception as e:
logger.error(
f"DEBUG: Stage '{stage_name}' failed - "
f"Transcript: {transcript_id}, Error: {str(e)}"
)
# Mark transcript as failed
transcript.status = "failed"
transcript.current_step = stage_name
transcript.error_message = str(e)
db.commit()
# Cleanup resources
await self._cleanup_failed_transcript(transcript_id)
# Alert admins
await self._send_failure_alert(transcript_id, stage_name, str(e))
raise
async def _cleanup_failed_transcript(self, transcript_id: str):
"""
Cleanup resources après échec.
"""
# Delete temporary files
temp_dir = f"/tmp/transcript_{transcript_id}"
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"DEBUG: Cleaned up temp dir: {temp_dir}")
# Cancel MeetNoo task
# (MeetNoo handles its own cleanup)
5. Graceful Degradation¶
5.1 Partial Results Strategy¶
class PostProcessingService:
async def enrich_segments_tolerant(
self,
segments: List[Dict],
db: Session,
transcript_id: str
) -> Tuple[List[EnrichedSegment], List[str]]:
"""
Enrichissement tolérant aux erreurs.
Returns:
(enriched_segments, errors)
"""
enriched = []
errors = []
for segment in segments:
try:
# Try full enrichment
enriched_segment = await self._enrich_single_segment(
segment, db, transcript_id
)
enriched.append(enriched_segment)
except Exception as e:
logger.warning(
f"DEBUG: Failed to enrich segment {segment['id']} - "
f"Error: {str(e)}, using degraded enrichment"
)
# Fallback: Basic enrichment
try:
degraded_segment = self._create_degraded_segment(
segment, transcript_id
)
enriched.append(degraded_segment)
errors.append(f"Segment {segment['id']}: {str(e)}")
except Exception as fatal_e:
logger.error(
f"DEBUG: Fatal error enriching segment {segment['id']} - "
f"Error: {str(fatal_e)}, skipping"
)
errors.append(
f"Segment {segment['id']}: FATAL - {str(fatal_e)}"
)
# Save partial results
if enriched:
db.bulk_save_objects(enriched)
db.commit()
logger.info(
f"DEBUG: Saved {len(enriched)}/{len(segments)} segments "
f"({len(errors)} errors)"
)
return enriched, errors
def _create_degraded_segment(
self,
segment: Dict,
transcript_id: str
) -> EnrichedSegment:
"""
Segment enrichi minimal (graceful degradation).
"""
return EnrichedSegment(
id=segment["id"],
transcript_id=transcript_id,
speaker_label=segment["speaker"],
identified_name=None, # Unknown
match_source="unknown",
start_time=segment["start_time"],
end_time=segment["end_time"],
transcription=segment["transcription"],
# Empty RAG context
rag_context={},
# Metadata flag
metadata={
"degraded": True,
"degradation_reason": "Enrichment failed"
},
created_at=unix_timestamp(),
updated_at=unix_timestamp()
)
5.2 UI Indicators¶
// Frontend: Show degraded quality
interface Segment {
identified_name: string | null;
metadata: {
degraded?: boolean;
degradation_reason?: string;
};
}
function renderSegment(segment: Segment) {
return (
<div className="segment">
<span className="speaker">
{segment.identified_name || "Intervenant inconnu"}
{segment.metadata.degraded && (
<Tooltip content={segment.metadata.degradation_reason}>
<Icon name="warning" className="text-yellow-500" />
</Tooltip>
)}
</span>
<p className="transcription">{segment.transcription}</p>
</div>
);
}
6. Error Logging¶
6.1 Structured Logging¶
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
def log_error(
self,
message: str,
error: Exception,
context: Dict[str, Any] = None
):
"""
Log erreur structurée.
"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": "ERROR",
"message": message,
"error": {
"type": type(error).__name__,
"message": str(error),
"traceback": traceback.format_exc()
},
"context": context or {}
}
self.logger.error(json.dumps(log_entry, ensure_ascii=False))
def log_warning(
self,
message: str,
context: Dict[str, Any] = None
):
"""Log warning structuré."""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": "WARNING",
"message": message,
"context": context or {}
}
self.logger.warning(json.dumps(log_entry, ensure_ascii=False))
Usage:
logger = StructuredLogger("smart_transcription")
try:
result = await process_transcript(transcript_id, db)
except Exception as e:
logger.log_error(
"Transcript processing failed",
error=e,
context={
"transcript_id": transcript_id,
"user_id": user_id,
"stage": "post_processing"
}
)
raise
6.2 Error Monitoring¶
class ErrorTracker:
def __init__(self, redis_client):
self.redis = redis_client
async def track_error(
self,
error_type: str,
error_message: str,
context: Dict[str, Any]
):
"""
Track errors dans Redis pour monitoring.
"""
error_key = f"errors:{error_type}:{datetime.utcnow().strftime('%Y-%m-%d')}"
error_data = {
"timestamp": datetime.utcnow().isoformat(),
"message": error_message,
"context": json.dumps(context)
}
# Increment counter
await self.redis.incr(f"{error_key}:count")
# Store last 100 errors
await self.redis.lpush(
f"{error_key}:details",
json.dumps(error_data)
)
await self.redis.ltrim(f"{error_key}:details", 0, 99)
# Set expiry (7 days)
await self.redis.expire(error_key, 7 * 24 * 3600)
Navigation: ← Data Models | Performance →