Project OverviewWhat we're building and why
You have 190 audio meeting files. The goal is to extract structured intelligence from every single one — who spoke, for how long, what was the sentiment, what topics came up, and make all of it searchable using vector similarity.
# Core transcription
openai-whisper
# Speaker diarization
pyannote.audio==3.1.1
# Embeddings
sentence-transformers
# Vector store
faiss-cpu # use faiss-gpu if you have a GPU
# Audio processing
librosa
soundfile
pydub
# Utilities
tqdm
numpy
pandas
rich # beautiful terminal output
Transcription EngineTurning audio into structured text with Whisper
OpenAI Whisper is a fully open-source, state-of-the-art speech recognition model that runs locally — no API key, no cost. It produces word-level timestamps which are essential for linking speaker labels from diarization.
Choosing the right Whisper model
| Model | Size | Speed (CPU) | Accuracy | Use case |
|---|---|---|---|---|
| tiny | 75M params | ~32× real-time | Medium | Quick prototyping |
| base | 142M params | ~16× real-time | Good | Development testing |
| small | 244M params | ~6× real-time | Better | Balanced choice ✓ |
| medium | 769M params | ~2× real-time | Great | Production quality |
| large | 1.5B params | ~1× real-time | Best | GPU recommended |
small on CPU. With 1-hour meetings averaging 60 minutes, the small model will process them in ~10 minutes each on a modern CPU. Total: ~30 hours. Run overnight or use a GPU to cut it to ~2 hours.import whisper
import json
import os
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Optional
from rich.console import Console
console = Console()
@dataclass
class Segment:
"One utterance: start/end times + text"
start: float
end: float
text: str
speaker: str = "UNKNOWN" # filled in by diarization
words: List[dict] = None # word-level timestamps
class Transcriber:
def __init__(self, model_size: str = "small"):
console.print(f"[cyan]Loading Whisper model: {model_size}...")
self.model = whisper.load_model(model_size)
console.print("[green]✓ Model loaded")
def transcribe(self, audio_path: str) -> dict:
"""
Transcribe one audio file.
Returns raw Whisper result with word-level timestamps.
"""
result = self.model.transcribe(
audio_path,
word_timestamps=True, # critical: enables per-word timing
language="en", # set None for auto-detect
task="transcribe",
verbose=False
)
return result
def to_segments(self, whisper_result: dict) -> List[Segment]:
"""Convert raw Whisper output to clean Segment objects."""
segments = []
for seg in whisper_result["segments"]:
segments.append(Segment(
start = round(seg["start"], 3),
end = round(seg["end"], 3),
text = seg["text"].strip(),
words = seg.get("words", [])
))
return segments
def transcribe_and_save(
self,
audio_path: str,
output_path: str,
skip_if_exists: bool = True
) -> List[Segment]:
"""Main entry point: transcribe + persist to JSON."""
if skip_if_exists and os.path.exists(output_path):
console.print(f"[yellow]⏭ Skipping (cached): {Path(audio_path).name}")
with open(output_path) as f:
data = json.load(f)
return [Segment(**s) for s in data["segments"]]
console.print(f"[blue]🎙 Transcribing: {Path(audio_path).name}")
raw = self.transcribe(audio_path)
segs = self.to_segments(raw)
# Persist to disk immediately (resume-friendly)
output = {
"file": audio_path,
"language": raw["language"],
"segments": [asdict(s) for s in segs]
}
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
console.print(f"[green]✓ Saved: {Path(output_path).name}")
return segs
word_timestamps=True, you can only assign speakers at the segment level (5–30 second chunks). With word timestamps, you can align speakers at millisecond precision — critical for meetings with rapid back-and-forth conversation.Speaker DiarizationWho said what and when
Transcription tells you what was said. Diarization answers who said it. We'll use pyannote.audio — the current open-source SOTA for speaker diarization.
hf.co/pyannote/speaker-diarization-3.1 and accept the usage conditions (free). Then run huggingface-cli login with your token.from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
import torch
from typing import List
from .transcribe import Segment
class Diarizer:
def __init__(self):
device = "cuda" if torch.cuda.is_available() else "cpu"
self.pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=True # reads from HuggingFace cache
).to(torch.device(device))
def diarize(self, audio_path: str) -> dict:
"""Return pyannote annotation object."""
with ProgressHook() as hook:
diarization = self.pipeline(audio_path, hook=hook)
return diarization
def assign_speakers(
self,
segments: List[Segment],
diarization
) -> List[Segment]:
"""
For each transcription segment, find the dominant speaker
from the diarization timeline by overlap.
"""
for seg in segments:
best_speaker = "UNKNOWN"
best_overlap = 0.0
for turn, _, speaker in diarization.itertracks(yield_label=True):
# calculate how many seconds of overlap
overlap = min(seg.end, turn.end) - max(seg.start, turn.start)
if overlap > best_overlap:
best_overlap = overlap
best_speaker = speaker
seg.speaker = best_speaker # e.g. "SPEAKER_00", "SPEAKER_01"
return segments
def count_speakers(self, segments: List[Segment]) -> int:
return len(set(s.speaker for s in segments if s.speaker != "UNKNOWN"))
speaker field like "SPEAKER_00", "SPEAKER_01". Note that speaker IDs are local to each meeting — SPEAKER_00 in Meeting A is not the same person as SPEAKER_00 in Meeting B unless you run cross-meeting speaker embedding matching.EmbeddingsConverting text to dense vectors for semantic search
An embedding is a list of ~384 numbers that encodes the meaning of a sentence. Two sentences with similar meaning will have similar vectors — even if they use different words. This is what powers semantic search over your meetings.
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
from .transcribe import Segment
class Embedder:
"""
Wraps sentence-transformers for batch embedding of meeting segments.
Model: all-MiniLM-L6-v2
- 384-dimensional vectors
- Fast: ~14k sentences/second on GPU, ~2k on CPU
- No API cost, runs 100% locally
"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.dim = self.model.get_sentence_embedding_dimension()
def embed_segments(
self,
segments: List[Segment],
batch_size: int = 64
) -> np.ndarray:
"""
Returns shape: (num_segments, 384)
Normalised to unit length for cosine similarity via dot product.
"""
texts = [s.text for s in segments]
vectors = self.model.encode(
texts,
batch_size = batch_size,
show_progress_bar = True,
normalize_embeddings= True # L2 normalise → cosine = dot product
)
return vectors.astype(np.float32) # FAISS requires float32
def embed_query(self, query: str) -> np.ndarray:
"""Embed a search query — must use same model as index."""
v = self.model.encode([query], normalize_embeddings=True)
return v.astype(np.float32)
# In your pipeline, collect ALL segments first, then embed once.
# This is faster than embedding file-by-file because the model
# processes in batches of 64, using GPU/CPU cache efficiently.
all_segments = []
all_meta = [] # we need to track which file each segment came from
for audio_file in audio_files:
segs = transcriber.transcribe_and_save(...)
segs = diarizer.assign_speakers(segs, ...)
for seg in segs:
all_segments.append(seg)
all_meta.append({
"meeting_id": audio_file.stem,
"speaker" : seg.speaker,
"start" : seg.start,
"end" : seg.end,
"text" : seg.text
})
# One big encode call — most efficient use of the model
embedder = Embedder()
vectors = embedder.embed_segments(all_segments, batch_size=128)
# vectors.shape → (total_segments, 384)
# all_meta[i] corresponds to vectors[i]
You have a choice in granularity — what unit of text gets one embedding:
FAISS Deep DiveFacebook AI Similarity Search — how it works internally
FAISS is a library that lets you search through millions of vectors in milliseconds. Understanding how it works internally will help you choose the right index type and tune it correctly.
Think of a FAISS index as a smart lookup table for vectors. When you add a vector, FAISS organizes it internally so that when you later ask "find me the 5 most similar vectors to this query", it doesn't have to compare against every vector — it can skip large regions of the space that are definitely not close.
The simplest index is IndexFlatL2 — it does exact brute-force comparison but is perfectly accurate. For 190 meetings with ~500 segments each (~95,000 total vectors at 384 dims), this is about 95,000 × 384 × 4 bytes = ~140 MB — fast and exact, still fine on a laptop.
If your vectors are L2-normalised (which our embedder does automatically with normalize_embeddings=True), then cosine similarity = dot product = L2 distance in terms of ranking. Use IndexFlatIP (Inner Product) — it returns the highest dot-product, i.e. the most similar segment.
If vectors are not normalised, use IndexFlatL2 which returns lowest Euclidean distance.
For under 1M vectors (your 95k is well under this), IndexFlatIP is fast enough — search takes <10ms. If you grow to 5M+ segments across hundreds of meetings over years, consider:
- IndexIVFFlat — clusters vectors into Voronoi cells. Search only nearby clusters. ~50× faster with slight accuracy loss.
- IndexHNSWFlat — hierarchical graph navigation. Better accuracy than IVF, slightly larger memory. Good production choice.
FAISS Index Visualised
import faiss
import numpy as np
import json
from pathlib import Path
from typing import List, Dict
class FAISSStore:
"""
Persistent FAISS index with metadata sidecar.
Files created:
index_dir/meeting_index.faiss — the binary FAISS index
index_dir/metadata.json — list of dicts, one per vector
index_dir/indexed_files.json — set of already-processed meeting IDs
"""
def __init__(self, index_dir: str, dim: int = 384):
self.index_dir = Path(index_dir)
self.index_dir.mkdir(parents=True, exist_ok=True)
self.dim = dim
self.index_path = self.index_dir / "meeting_index.faiss"
self.meta_path = self.index_dir / "metadata.json"
self.done_path = self.index_dir / "indexed_files.json"
self._load_or_create()
def _load_or_create(self):
if self.index_path.exists():
self.index = faiss.read_index(str(self.index_path))
with open(self.meta_path) as f:
self.metadata = json.load(f)
with open(self.done_path) as f:
self.indexed_files = set(json.load(f))
else:
# IndexFlatIP: exact cosine search (requires normalised vectors)
self.index = faiss.IndexFlatIP(self.dim)
self.metadata = []
self.indexed_files = set()
def add_meeting(
self,
meeting_id: str,
vectors: np.ndarray,
meta_rows: List[Dict]
):
"""Add one meeting's segments to the index."""
self.index.add(vectors) # adds to in-memory FAISS index
self.metadata.extend(meta_rows) # parallel list of dicts
self.indexed_files.add(meeting_id)
self.save() # persist immediately
def save(self):
"""Persist index and metadata to disk."""
faiss.write_index(self.index, str(self.index_path))
with open(self.meta_path, "w") as f:
json.dump(self.metadata, f)
with open(self.done_path, "w") as f:
json.dump(list(self.indexed_files), f)
def search(
self,
query_vector: np.ndarray,
top_k: int = 10
) -> List[Dict]:
"""
Return top_k most semantically similar segments.
query_vector must be normalised float32 of shape (1, 384).
"""
scores, indices = self.index.search(query_vector, top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1: continue # FAISS returns -1 for empty slots
result = self.metadata[idx].copy()
result["score"] = float(score)
results.append(result)
return results
Incremental IndexingProcessing new meetings without rebuilding the entire index
This is the most important engineering decision. You have 190 meetings now, but meetings happen every week. You never want to re-process files you've already indexed.
Check the indexed_files registry
On startup, load the set of meeting IDs already in the index. Any file whose stem (e.g. "2024-11-15_standup") is in this set is skipped entirely.
Process only NEW files
Scan the audio folder, diff against the registry. Only new files enter the pipeline — transcription, diarization, embedding.
Append vectors to the existing FAISS index
FAISS index.add(new_vectors) appends in-place. The existing vectors are not touched. Old query results remain valid.
Extend the metadata list and save
The metadata JSON grows by the number of new segments. The FAISS binary is rewritten. Both operations are atomic from the pipeline's perspective.
from pathlib import Path
from rich.console import Console
from rich.progress import track
from .transcribe import Transcriber
from .diarize import Diarizer
from .embed import Embedder
from .faiss_store import FAISSStore
console = Console()
def run_incremental_pipeline(
audio_dir: str = "data/raw_audio",
output_dir: str = "data/transcripts",
index_dir: str = "data/faiss_index",
model_size: str = "small"
):
# --- Setup ---
audio_files = sorted(Path(audio_dir).glob("*.mp3")) + \
sorted(Path(audio_dir).glob("*.wav"))
Path(output_dir).mkdir(parents=True, exist_ok=True)
store = FAISSStore(index_dir)
transcriber = Transcriber(model_size)
diarizer = Diarizer()
embedder = Embedder()
# --- Filter to only NEW files ---
new_files = [
f for f in audio_files
if f.stem not in store.indexed_files
]
if not new_files:
console.print("[green]✓ Index is up-to-date. No new files to process.")
return
console.print(f"[bold orange]Found {len(new_files)} new files to index.")
# --- Process each new file ---
for audio_file in track(new_files, description="Indexing..."):
meeting_id = audio_file.stem
out_json = Path(output_dir) / f"{meeting_id}.json"
# Step 1 & 2: Transcribe + Diarize
segments = transcriber.transcribe_and_save(
str(audio_file), str(out_json)
)
diarization = diarizer.diarize(str(audio_file))
segments = diarizer.assign_speakers(segments, diarization)
# Step 3: Embed
vectors = embedder.embed_segments(segments)
# Step 4: Build metadata rows (parallel to vectors)
meta_rows = [
{
"meeting_id": meeting_id,
"speaker" : s.speaker,
"start" : s.start,
"end" : s.end,
"text" : s.text,
}
for s in segments
]
# Step 5: Append to FAISS (incremental!)
store.add_meeting(meeting_id, vectors, meta_rows)
console.print(f"[green]✓ Indexed: {meeting_id} ({len(segments)} segments)")
console.print(f"[bold green]Done! Total indexed: {store.index.ntotal} segments")
store.indexed_files is a set persisted to indexed_files.json. Every time a meeting is added, its ID is appended. When you run the pipeline next week with 5 new meetings, only those 5 are processed. Your 190 already-indexed meetings are untouched — zero re-computation.Speaker MetricsPer-person and per-meeting analytics
from collections import defaultdict
from transformers import pipeline as hf_pipeline
import numpy as np
from typing import List, Dict
from .transcribe import Segment
# Lightweight sentiment model (downloads once, runs locally)
_sentiment = hf_pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment",
truncation=True
)
def compute_meeting_metrics(
segments: List[Segment],
meeting_id: str,
total_duration: float
) -> Dict:
"""
Returns a rich metrics dict for one meeting.
"""
stats = defaultdict(lambda: {
"talk_time" : 0.0,
"turn_count" : 0,
"word_count" : 0,
"interruptions": 0,
"sentiment" : [],
"turns" : []
})
texts_for_sentiment = []
idx_map = [] # (speaker, index in texts_for_sentiment)
for i, seg in enumerate(segments):
sp = seg.speaker
dur = seg.end - seg.start
stats[sp]["talk_time"] += dur
stats[sp]["turn_count"] += 1
stats[sp]["word_count"] += len(seg.text.split())
stats[sp]["turns"].append(seg)
# Interruption detection
if i > 0:
prev = segments[i - 1]
gap = seg.start - prev.end
if prev.speaker != sp and gap < 0.5:
stats[sp]["interruptions"] += 1
# Collect for batch sentiment analysis
texts_for_sentiment.append(seg.text[:512]) # truncate for model
idx_map.append(sp)
# Batch sentiment analysis (much faster than one-by-one)
sentiments = _sentiment(texts_for_sentiment, batch_size=32)
for sp, result in zip(idx_map, sentiments):
stats[sp]["sentiment"].append(result["label"])
# Summarise
summary = {
"meeting_id" : meeting_id,
"total_duration" : total_duration,
"num_speakers" : len(stats),
"speakers" : {}
}
for sp, s in stats.items():
sent_counts = {
"positive": s["sentiment"].count("LABEL_2"),
"neutral" : s["sentiment"].count("LABEL_1"),
"negative": s["sentiment"].count("LABEL_0"),
}
summary["speakers"][sp] = {
"talk_time_sec" : round(s["talk_time"], 2),
"talk_ratio_pct" : round(s["talk_time"] / total_duration * 100, 1),
"turn_count" : s["turn_count"],
"avg_turn_sec" : round(s["talk_time"] / s["turn_count"], 2),
"word_count" : s["word_count"],
"interruptions" : s["interruptions"],
"sentiment" : sent_counts,
}
return summary
Full Pipeline Entry PointPutting it all together — run.py
#!/usr/bin/env python3
"""
Meeting Audio Analyser — Entry Point
Usage:
python run.py # index new files
python run.py --query "budget" # semantic search
python run.py --report # generate per-speaker report
"""
import argparse
from rich.console import Console
from rich.table import Table
from src.pipeline import run_incremental_pipeline
from src.faiss_store import FAISSStore
from src.embed import Embedder
console = Console()
def search(query: str, top_k: int = 10):
"""Semantic search across all indexed meetings."""
store = FAISSStore("data/faiss_index")
embedder = Embedder()
vec = embedder.embed_query(query)
results = store.search(vec, top_k=top_k)
table = Table(title=f'Results for: "{query}"')
table.add_column("Score", style="cyan", width=8)
table.add_column("Meeting", style="yellow", width=24)
table.add_column("Speaker", style="green", width=14)
table.add_column("Time", style="magenta",width=10)
table.add_column("Snippet", style="white", width=60)
for r in results:
table.add_row(
f'{r["score"]:.3f}',
r["meeting_id"],
r["speaker"],
f'{r["start"]:.0f}s',
r["text"][:80] + ("…" if len(r["text"]) > 80 else "")
)
console.print(table)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--query", type=str, help="Semantic search query")
parser.add_argument("--top-k", type=int, default=10)
parser.add_argument("--model", type=str, default="small")
args = parser.parse_args()
if args.query:
search(args.query, args.top_k)
else:
run_incremental_pipeline(model_size=args.model)
# First run: indexes all 190 files (runs overnight)
python run.py
# Next week: only new files are processed
python run.py
# Semantic search across all meetings
python run.py --query "Q3 budget discussion"
python run.py --query "product roadmap priorities"
python run.py --query "who mentioned deadlines" --top-k 20