API Reference¶
Auto-generated from source code.
Configuration¶
autorag_research.config.ExecutorConfig
dataclass
¶
Configuration for the Executor.
Attributes:
| Name | Type | Description |
|---|---|---|
pipelines |
list[BasePipelineConfig]
|
List of pipeline configurations to run. |
metrics |
list[BaseMetricConfig]
|
List of metric configurations to evaluate. |
max_retries |
int
|
Maximum number of retry attempts for failed pipelines. |
eval_batch_size |
int
|
Batch size for metric evaluation. |
health_check_queries |
int
|
Number of queries to run during health check before full execution. Set to 0 to disable. Defaults to 2. |
Example
config = ExecutorConfig(
pipelines=[
BM25PipelineConfig(name="bm25_v1", tokenizer="bert"),
],
metrics=[
RecallConfig(),
NDCGConfig(),
],
max_retries=3,
)
autorag_research.config.BasePipelineConfig
dataclass
¶
Bases: ABC
Base configuration for all pipelines.
Subclasses should define their specific configuration parameters as dataclass fields and implement the abstract methods.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Unique name for this pipeline instance. |
description |
str
|
Optional description of the pipeline. |
pipeline_type |
PipelineType
|
Type of pipeline (RETRIEVAL or GENERATION). |
top_k |
int
|
Number of results to retrieve per query. Default: 10. |
batch_size |
int
|
Number of queries to fetch from DB at once. Default: 128. |
max_concurrency |
int
|
Maximum concurrent async operations (semaphore limit). Default: 16. |
max_retries |
int
|
Maximum retry attempts for failed queries (uses tenacity). Default: 3. |
retry_delay |
float
|
Base delay in seconds for exponential backoff between retries. Default: 1.0. |
Example
@dataclass
class BM25PipelineConfig(BasePipelineConfig):
tokenizer: str = "bert"
index_name: str = "idx_chunk_bm25"
pipeline_type: PipelineType = field(default=PipelineType.RETRIEVAL, init=False)
def get_pipeline_class(self) -> Type:
from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline
return BM25RetrievalPipeline
def get_pipeline_kwargs(self) -> dict[str, Any]:
return {"tokenizer": self.tokenizer, "index_name": self.index_name}
get_pipeline_class()
abstractmethod
¶
Return the pipeline class to instantiate.
Returns:
| Type | Description |
|---|---|
type[BaseRetrievalPipeline]
|
The pipeline class type. |
get_pipeline_kwargs()
abstractmethod
¶
Return kwargs for pipeline constructor.
These kwargs are passed to the pipeline constructor along with session_factory, name, and schema (which are handled by Executor).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary of keyword arguments for the pipeline constructor. |
get_run_kwargs()
abstractmethod
¶
Return kwargs for pipeline.run() method.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary of keyword arguments for the run method. |
autorag_research.config.BaseMetricConfig
dataclass
¶
Bases: ABC
Base configuration for all metrics.
Subclasses should define their specific configuration parameters as dataclass fields and implement the abstract methods.
Attributes:
| Name | Type | Description |
|---|---|---|
metric_type |
MetricType
|
Type of metric (RETRIEVAL or GENERATION). |
Example
@dataclass
class RecallConfig(BaseMetricConfig):
metric_type: MetricType = field(default=MetricType.RETRIEVAL, init=False)
def get_metric_name(self) -> str:
return "retrieval_recall"
def get_metric_func(self) -> Callable:
from autorag_research.evaluation.metrics import retrieval_recall
return retrieval_recall
get_compute_granularity()
¶
Return metric compute granularity.
Query-level metrics return one score per query and are computed batch-by-batch. Dataset-level metrics require all samples together to produce paper-aligned scores.
Returns:
| Type | Description |
|---|---|
Literal['query', 'dataset']
|
"query" for per-query metrics (default), "dataset" for whole-dataset metrics. |
get_metric_func()
abstractmethod
¶
Return the metric function.
Returns:
| Type | Description |
|---|---|
Callable
|
The callable metric function. |
get_metric_kwargs()
¶
Return optional kwargs for the metric function.
Override this method if the metric function accepts additional arguments.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary of keyword arguments for the metric function. |
get_metric_name()
¶
Return the metric name for database storage.
Returns:
| Type | Description |
|---|---|
str
|
The metric name string. |
Executor¶
autorag_research.executor.Executor
¶
Orchestrates pipeline execution and metric evaluation.
The Executor coordinates: 1. Sequential execution of configured pipelines 2. Verification that all queries have results 3. Retry logic for failed pipelines 4. Metric evaluation for each pipeline (immediately after pipeline completes)
Metric Evaluation Rules: - Retrieval pipelines: Only retrieval metrics are evaluated - Generation pipelines: Both retrieval AND generation metrics are evaluated
Example
from autorag_research.config import ExecutorConfig
from autorag_research.executor import Executor
from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.retrieval.bm25 import BM25PipelineConfig
from autorag_research.evaluation.metrics.retrieval import RecallConfig, NDCGConfig
db = DBConnection.from_config() # or DBConnection.from_env()
session_factory = db.get_session_factory()
config = ExecutorConfig(
pipelines=[
BM25PipelineConfig(
name="bm25_baseline",
tokenizer="bert",
top_k=10,
),
],
metrics=[
RecallConfig(),
NDCGConfig(),
],
max_retries=3,
)
executor = Executor(session_factory, config)
result = executor.run()
__init__(session_factory, config, schema=None, config_dir=None)
¶
Initialize Executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
config
|
ExecutorConfig
|
Executor configuration. |
required |
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default schema. |
None
|
config_dir
|
Path | None
|
Directory containing pipeline YAML configs. If None, attempts to use Hydra's config path if initialized, otherwise falls back to CWD/configs. |
None
|
run()
¶
Run all configured pipelines and evaluate metrics.
For each pipeline: 1. Resolve retrieval-pipeline dependencies 2. Run health check (if enabled) 3. Run the pipeline with retry logic 4. Verify completion 5. Evaluate applicable metrics (before moving to next pipeline)
Returns:
| Type | Description |
|---|---|
ExecutorResult
|
ExecutorResult with comprehensive execution statistics. |
Pipelines¶
Retrieval¶
autorag_research.pipelines.retrieval.base.BaseRetrievalPipeline
¶
Bases: BasePipeline, ABC
Abstract base class for all retrieval pipelines.
This class provides common functionality for retrieval pipelines: - Service initialization - Pipeline creation in database - Abstract retrieve methods for subclasses to implement
Subclasses must implement:
- _retrieve_by_id(): Async method for retrieval using query ID (query exists in DB)
- _retrieve_by_text(): Async method for retrieval using raw query text
- _get_pipeline_config(): Return the pipeline configuration dict
__init__(session_factory, name, schema=None)
¶
Initialize retrieval pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
name
|
str
|
Name for this pipeline. |
required |
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default schema. |
None
|
retrieve(query_text, top_k=10)
async
¶
Retrieve chunks for a single query (async).
This method provides single-query retrieval, designed for use within GenerationPipeline where queries are processed one at a time.
Checks if query exists in DB: - If exists: uses _retrieve_by_id() (faster, uses stored embedding) - If not: uses _retrieve_by_text() (may trigger embedding computation)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_text
|
str
|
The query text to retrieve for. |
required |
top_k
|
int
|
Number of chunks to retrieve. |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of dicts with 'doc_id' (chunk ID) and 'score' keys. |
run(top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)
¶
Run the retrieval pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_k
|
int
|
Number of top documents to retrieve per query. |
10
|
batch_size
|
int
|
Number of queries to fetch from DB at once. |
128
|
max_concurrency
|
int
|
Maximum number of concurrent async operations. |
16
|
max_retries
|
int
|
Maximum number of retry attempts for failed queries. |
3
|
retry_delay
|
float
|
Base delay in seconds for exponential backoff between retries. |
1.0
|
query_limit
|
int | None
|
Maximum number of queries to process. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with pipeline execution statistics: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
autorag_research.pipelines.retrieval.bm25.BM25RetrievalPipeline
¶
Bases: BaseRetrievalPipeline
Pipeline for running VectorChord-BM25 retrieval.
This pipeline wraps RetrievalPipelineService with BM25DBModule, providing a convenient interface for BM25-based retrieval using PostgreSQL's VectorChord-BM25 extension.
BM25 does not require embeddings, so both _retrieve_by_id() and _retrieve_by_text() work without any embedding model.
Example
from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline
db = DBConnection.from_config() # or DBConnection.from_env()
session_factory = db.get_session_factory()
# Initialize pipeline
pipeline = BM25RetrievalPipeline(
session_factory=session_factory,
name="bm25_baseline",
tokenizer="bert",
)
# Run pipeline on all queries in DB
results = pipeline.run(top_k=10)
# Or retrieve for a single query
chunks = await pipeline.retrieve("What is machine learning?", top_k=10)
__init__(session_factory, name, tokenizer='bert', index_name=DEFAULT_BM25_INDEX_NAME, schema=None)
¶
Initialize BM25 retrieval pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
name
|
str
|
Name for this pipeline. |
required |
tokenizer
|
str
|
Tokenizer name for BM25 (default: "bert" for bert_base_uncased). Available tokenizers (pg_tokenizer pre-built models): - "bert": bert-base-uncased (Hugging Face) - Default - "wiki_tocken": Wikitext-103 trained model - "gemma2b": Google lightweight model (~100MB memory) - "llmlingua2": Microsoft summarization model (~200MB memory) See: https://github.com/tensorchord/pg_tokenizer.rs/blob/main/docs/06-model.md |
'bert'
|
index_name
|
str
|
Name of the BM25 index (default: "idx_chunk_bm25"). |
DEFAULT_BM25_INDEX_NAME
|
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default schema. |
None
|
Generation¶
autorag_research.pipelines.generation.base.BaseGenerationPipeline
¶
Bases: BasePipeline, ABC
Abstract base class for all generation pipelines.
This class provides common functionality for generation pipelines: - Composition with a retrieval pipeline for flexible retrieval strategies - Service initialization for database operations - Pipeline creation in database - Abstract generate method for subclasses to implement
Subclasses must implement:
- _generate(): Async generate method given a query ID (has access to a retrieval pipeline)
- _get_pipeline_config(): Return the pipeline configuration dict
Example
class BasicRAGPipeline(BaseGenerationPipeline):
async def _generate(self, query_id: int | str, top_k: int) -> GenerationResult:
# Retrieve relevant chunks by query_id (async)
results = await self._retrieval_pipeline._retrieve_by_id(query_id, top_k)
chunk_ids = [r["doc_id"] for r in results]
chunk_contents = self._service.get_chunk_contents(chunk_ids)
# Retrieve relevant chunks (async)
results = await self._retrieval_pipeline.retrieve(query_text, top_k)
chunks = [self.get_chunk_content(r["doc_id"]) for r in results]
# Get query text (uses query_to_llm if available, else contents)
query_text = self._get_query_text(query_id)
# Build prompt and generate (async)
context = "\n\n".join(chunk_contents)
prompt = f"Context:\n{context}\n\nQuestion: {query_text}\n\nAnswer:"
response = await self._llm.ainvoke(prompt)
return GenerationResult(text=str(response.content))
__init__(session_factory, name, llm, retrieval_pipeline, schema=None)
¶
Initialize generation pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
name
|
str
|
Name for this pipeline. |
required |
llm
|
BaseLanguageModel
|
LangChain BaseLanguageModel instance for text generation. |
required |
retrieval_pipeline
|
BaseRetrievalPipeline
|
Retrieval pipeline instance for fetching relevant context. |
required |
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default schema. |
None
|
run(top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)
¶
Run the generation pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_k
|
int
|
Number of top documents to retrieve per query. |
10
|
batch_size
|
int
|
Number of queries to fetch from DB at once. |
128
|
max_concurrency
|
int
|
Maximum number of concurrent async operations. |
16
|
max_retries
|
int
|
Maximum number of retry attempts for failed queries. |
3
|
retry_delay
|
float
|
Base delay in seconds for exponential backoff between retries. |
1.0
|
query_limit
|
int | None
|
Maximum number of queries to process. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with pipeline execution statistics: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
autorag_research.pipelines.generation.basic_rag.BasicRAGPipeline
¶
Bases: BaseGenerationPipeline
Simple single-call RAG pipeline: retrieve once -> build prompt -> generate once.
This pipeline implements the most basic RAG pattern: 1. Take a query 2. Retrieve relevant chunks using the composed retrieval pipeline 3. Build a prompt with retrieved context 4. Call LLM once to generate the answer
The retrieval pipeline can be any BaseRetrievalPipeline implementation (BM25, vector search, hybrid, HyDE, etc.), providing flexibility in the retrieval strategy while keeping the generation simple.
Example
from langchain_openai import ChatOpenAI
from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.generation.basic_rag import BasicRAGPipeline
from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline
db = DBConnection.from_config() # or DBConnection.from_env()
session_factory = db.get_session_factory()
# Create retrieval pipeline
retrieval_pipeline = BM25RetrievalPipeline(
session_factory=session_factory,
name="bm25_baseline",
tokenizer="bert",
)
# Create generation pipeline
pipeline = BasicRAGPipeline(
session_factory=session_factory,
name="basic_rag_v1",
llm=ChatOpenAI(model="gpt-4"),
retrieval_pipeline=retrieval_pipeline,
)
# Run pipeline
results = pipeline.run(top_k=5)
__init__(session_factory, name, llm, retrieval_pipeline, prompt_template=DEFAULT_PROMPT_TEMPLATE, schema=None)
¶
Initialize Basic RAG pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
name
|
str
|
Name for this pipeline. |
required |
llm
|
BaseLanguageModel
|
LangChain BaseLanguageModel instance for text generation. |
required |
retrieval_pipeline
|
BaseRetrievalPipeline
|
Retrieval pipeline for fetching relevant context. |
required |
prompt_template
|
str
|
Template string with {context} and {query} placeholders. |
DEFAULT_PROMPT_TEMPLATE
|
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default schema. |
None
|
Metrics¶
Retrieval¶
autorag_research.evaluation.metrics.retrieval
¶
F1Config
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval F1 metric.
get_metric_func()
¶
Return the metric function.
FullRecallConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval full recall metric.
get_metric_func()
¶
Return the metric function.
MAPConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval MAP metric.
get_metric_func()
¶
Return the metric function.
MRRConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval MRR metric.
get_metric_func()
¶
Return the metric function.
NDCGConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval NDCG metric.
get_metric_func()
¶
Return the metric function.
PrecisionConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval precision metric.
get_metric_func()
¶
Return the metric function.
RecallConfig
dataclass
¶
Bases: BaseRetrievalMetricConfig
Configuration for retrieval recall metric.
get_metric_func()
¶
Return the metric function.
retrieval_f1(metric_input)
¶
Compute f1 score for retrieval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The f1 score. |
retrieval_full_recall(metric_input)
¶
Compute full recall (binary) for retrieval.
Returns 1.0 if ALL ground truth groups are satisfied (at least one item from each OR-group is retrieved), 0.0 otherwise.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
1.0 if all GT groups are satisfied, 0.0 otherwise. |
retrieval_map(metric_input)
¶
Compute MAP (Mean Average Precision) score for retrieval.
Mean Average Precision (MAP) is the mean of Average Precision (AP) for all queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The MAP score. |
retrieval_mrr(metric_input)
¶
Compute MRR (Mean Reciprocal Rank) score for retrieval.
Reciprocal Rank (RR) is the reciprocal of the rank of the first relevant item. Mean of RR in whole queries is MRR.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The MRR score. |
retrieval_ndcg(metric_input)
¶
Compute NDCG for multi-hop retrieval with AND-OR group semantics.
Ground truth structure: [[A, B], [C]] means (A OR B) AND C - Each inner list is an OR group (any item satisfies the group) - Outer list is AND (all groups must be satisfied for complete retrieval)
A retrieved item contributes to DCG only when it's the FIRST to satisfy a previously unsatisfied group. Subsequent items from the same group don't add value (they're redundant for answering the query).
Supports graded relevance when metric_input.relevance_scores is provided.
Falls back to binary relevance (score=1) when relevance_scores is None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. - retrieval_gt: 2D list of ground truth IDs (AND/OR structure) - retrieved_ids: list of retrieved IDs - relevance_scores: optional dict mapping doc_id -> graded relevance score (e.g., 0=not relevant, 1=somewhat relevant, 2=highly relevant) |
required |
Returns:
| Type | Description |
|---|---|
float
|
The NDCG score. |
Examples:
GT: [[A, B], [C]] (need A-or-B AND C) Retrieved: [A, C] -> Perfect (both groups satisfied at top positions) Retrieved: [A, B] -> Partial (group 1 not satisfied, B is redundant) Retrieved: [C, A] -> Good but suboptimal ordering
retrieval_precision(metric_input)
¶
Compute precision score for retrieval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The precision score. |
retrieval_recall(metric_input)
¶
Compute recall score for retrieval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_input
|
MetricInput
|
The MetricInput schema for AutoRAG metric. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The recall score. |
Generation¶
autorag_research.evaluation.metrics.generation
¶
BertScoreConfig
dataclass
¶
Bases: BaseGenerationMetricConfig
Configuration for BERTScore metric.
Attributes:
| Name | Type | Description |
|---|---|---|
lang |
str
|
Language code for the text. |
batch |
int
|
Batch size for processing. |
n_threads |
int | None
|
Number of threads to use. |
BleuConfig
dataclass
¶
Bases: BaseGenerationMetricConfig
Configuration for BLEU metric.
Attributes:
| Name | Type | Description |
|---|---|---|
tokenize |
str | None
|
The tokenizer to use. If None, defaults to language-specific tokenizers. |
smooth_method |
str
|
The smoothing method ('floor', 'add-k', 'exp' or 'none'). |
smooth_value |
float | None
|
The smoothing value for 'floor' and 'add-k' methods. |
max_ngram_order |
int
|
Maximum n-gram order when computing precisions. |
effective_order |
bool
|
Stop including n-gram orders for which precision is 0. |
MeteorConfig
dataclass
¶
Bases: BaseGenerationMetricConfig
Configuration for METEOR metric.
Attributes:
| Name | Type | Description |
|---|---|---|
alpha |
float
|
Parameter for controlling relative weights of precision and recall. |
beta |
float
|
Parameter for controlling shape of penalty as a function of fragmentation. |
gamma |
float
|
Relative weight assigned to fragmentation penalty. |
ResponseRelevancyConfig
dataclass
¶
RougeConfig
dataclass
¶
Bases: BaseGenerationMetricConfig
Configuration for ROUGE metric.
Attributes:
| Name | Type | Description |
|---|---|---|
rouge_type |
str
|
Rouge type to use ('rouge1', 'rouge2', 'rougeL', 'rougeLSum'). |
use_stemmer |
bool
|
Whether to use Porter stemmer for word suffix stripping. |
split_summaries |
bool
|
Whether to add newlines between sentences for rougeLsum. |
SemScoreConfig
dataclass
¶
Bases: BaseGenerationMetricConfig
Configuration for SemScore (semantic similarity) metric.
Attributes:
| Name | Type | Description |
|---|---|---|
embedding_model |
Embeddings | str
|
Embedding model config name (e.g., "openai-large") or Embeddings instance. |
truncate_length |
int
|
Maximum length of texts to embed. |
bert_score(metric_inputs, lang='en', batch=128, n_threads=os.cpu_count())
¶
Compute BERTScore metric for generation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_inputs
|
list[MetricInput]
|
A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts"). |
required |
lang
|
str
|
Language code for the text. Default is "en". |
'en'
|
batch
|
int
|
Batch size for processing. Default is 128. |
128
|
n_threads
|
int | None
|
Number of threads to use. Default is the number of CPU cores. |
cpu_count()
|
Returns:
| Type | Description |
|---|---|
list[float]
|
A list of BERTScore F1 scores. |
bleu(metric_inputs, tokenize=None, smooth_method='exp', smooth_value=None, max_ngram_order=4, trg_lang='', effective_order=True, **kwargs)
¶
Computes the BLEU metric given pred and ground-truth.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_inputs
|
list[MetricInput]
|
A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts"). |
required |
tokenize
|
str | None
|
The tokenizer to use. If None, defaults to language-specific tokenizers with '13a' as the fallback default. check https://github.com/mjpost/sacrebleu/blob/master/sacrebleu/metrics/bleu.py |
None
|
smooth_method
|
str
|
The smoothing method to use ('floor', 'add-k', 'exp' or 'none'). |
'exp'
|
smooth_value
|
float | None
|
The smoothing value for |
None
|
max_ngram_order
|
int
|
If given, it overrides the maximum n-gram order (default: 4) when computing precisions. |
4
|
trg_lang
|
str
|
An optional language code to raise potential tokenizer warnings. |
''
|
effective_order
|
bool
|
If |
True
|
**kwargs
|
Any
|
Additional arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
list[float]
|
A list of BLEU scores. |
huggingface_evaluate(instance, key, metric_inputs, **kwargs)
¶
Compute huggingface evaluate metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance
|
Any
|
The instance of huggingface evaluates metric. |
required |
key
|
str
|
The key to retrieve result score from huggingface evaluate result. |
required |
metric_inputs
|
list[MetricInput]
|
A list of MetricInput schema. |
required |
**kwargs
|
Any
|
The additional arguments for metric function. |
{}
|
Returns:
| Type | Description |
|---|---|
list[float]
|
The list of scores. |
meteor(metric_inputs, alpha=0.9, beta=3.0, gamma=0.5)
¶
Compute meteor score for generation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_inputs
|
list[MetricInput]
|
A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts"). |
required |
alpha
|
float
|
Parameter for controlling relative weights of precision and recall. Default is 0.9. |
0.9
|
beta
|
float
|
Parameter for controlling shape of penalty as a function of as a function of fragmentation. Default is 3.0. |
3.0
|
gamma
|
float
|
Relative weight assigned to fragmentation penalty. Default is 0.5. |
0.5
|
Returns:
| Type | Description |
|---|---|
list[float]
|
A list of computed metric scores. |
response_relevancy(metric_inputs, llm, embedding_model, strictness=3, prompt_template=DEFAULT_RESPONSE_RELEVANCY_PROMPT)
¶
RAGAS-style response relevancy metric without ragas dependency.
rouge(metric_inputs, rouge_type='rougeL', use_stemmer=False, split_summaries=False)
¶
Compute rouge score for generation.
Args:
metric_inputs: A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").
rouge_type: A rouge type to use for evaluation. Default is 'RougeL'.
Choose between rouge1, rouge2, rougeL, and rougeLSum.
- rouge1: unigram (1-gram) based scoring.
- rouge2: bigram (2-gram) based scoring.
- rougeL: Longest Common Subsequence based scoring.
- rougeLSum: splits text using "
" use_stemmer: Bool indicating whether Porter stemmer should be used to strip word suffixes to improve matching. This arg is used in the DefaultTokenizer, but other tokenizers might or might not choose to use this. Default is False. split_summaries: Whether to add newlines between sentences for rougeLsum. Default is False.
Returns:
A list of computed metric scores.
sem_score(metric_inputs, embedding_model, truncate_length=4096)
¶
Compute sem score between generation gt and pred with cosine similarity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_inputs
|
list[MetricInput]
|
A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts"). |
required |
embedding_model
|
Embeddings | str
|
Embedding model to use for compute cosine similarity. Can be an Embeddings instance or a string config name (e.g., "openai-large"). |
required |
truncate_length
|
int
|
Maximum length of texts to embedding. Default is 4096. |
4096
|
Returns:
| Type | Description |
|---|---|
list[float]
|
A list of computed metric scores. |
Data Ingestion¶
autorag_research.data.base.DataIngestor
¶
Bases: ABC
detect_primary_key_type()
abstractmethod
¶
Detect the primary key type used in the dataset.
ingest(subset='test', query_limit=None, min_corpus_cnt=None)
abstractmethod
¶
Ingest data from the specified source. This process does not include an embedding process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subset
|
Literal['train', 'dev', 'test']
|
Dataset split to ingest (train, dev, or test). |
'test'
|
query_limit
|
int | None
|
Maximum number of queries to ingest. None means no limit. |
None
|
min_corpus_cnt
|
int | None
|
Maximum number of corpus items to ingest. When set, gold IDs from selected queries are always included, plus random samples to reach the limit. None means no limit. |
None
|
autorag_research.data.base.TextEmbeddingDataIngestor
¶
Bases: DataIngestor, ABC
embed_all(max_concurrency=16, batch_size=128)
¶
Embed all queries and text chunks.
autorag_research.data.base.MultiModalEmbeddingDataIngestor
¶
Bases: DataIngestor, ABC
ORM¶
Repository¶
autorag_research.orm.repository
¶
Repository module for AutoRAG-Research ORM.
This module provides repository classes for data access layer operations.
BaseVectorRepository
¶
Bases: GenericRepository[T]
Base repository with vector search capabilities.
Extends GenericRepository with vector search methods for use with pgvector and VectorChord for efficient similarity search.
maxsim_search(query_vectors, vector_column='embeddings', limit=10)
¶
Perform MaxSim search using VectorChord's @# operator.
MaxSim computes late interaction similarity: for each query vector, find the closest document vector, compute dot product, and sum results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_vectors
|
list[list[float]]
|
List of query embedding vectors (multi-vector query). |
required |
vector_column
|
str
|
Name of the multi-vector column to search. |
'embeddings'
|
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[tuple[T, float]]
|
List of tuples (entity, distance_score) ordered by similarity. |
list[tuple[T, float]]
|
Lower distance scores indicate higher similarity. |
list[tuple[T, float]]
|
The distance score calculated by (1 - maxsim_score), thus the range is [-infinity, 0]. |
list[tuple[T, float]]
|
You might normalize this score by dividing by the number of query vectors. |
Note
Requires VectorChord extension and vchordrq index with vector_maxsim_ops. Example index: CREATE INDEX ON table USING vchordrq (embeddings vector_maxsim_ops);
maxsim_search_with_ids(query_vectors, vector_column='embeddings', id_column='id', limit=10)
¶
Perform MaxSim search and return only IDs with scores.
This is more efficient when you only need IDs and scores.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_vectors
|
list[list[float]]
|
List of query embedding vectors (multi-vector query). |
required |
vector_column
|
str
|
Name of the multi-vector column to search. |
'embeddings'
|
id_column
|
str
|
Name of the primary key column. |
'id'
|
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[tuple[int | str, float]]
|
List of tuples (entity_id, distance_score) ordered by similarity. |
set_multi_vector_embedding(entity_id, embeddings, vector_column='embeddings', id_column='id')
¶
Set multi-vector embedding for an entity using raw SQL.
This method bypasses SQLAlchemy's type processing to properly format vector arrays for VectorChord compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
int | str
|
The entity's primary key. |
required |
embeddings
|
list[list[float]]
|
List of embedding vectors (list of list of floats). |
required |
vector_column
|
str
|
Name of the multi-vector column (default: "embeddings"). |
'embeddings'
|
id_column
|
str
|
Name of the primary key column (default: "id"). |
'id'
|
Returns:
| Type | Description |
|---|---|
bool
|
True if update was successful, False otherwise. |
set_multi_vector_embeddings_batch(entity_ids, embeddings_list, vector_column='embeddings', id_column='id')
¶
Batch set multi-vector embeddings for multiple entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_ids
|
list[int | str]
|
List of entity primary keys. |
required |
embeddings_list
|
list[list[list[float]]]
|
List of multi-vector embeddings (one per entity). |
required |
vector_column
|
str
|
Name of the multi-vector column (default: "embeddings"). |
'embeddings'
|
id_column
|
str
|
Name of the primary key column (default: "id"). |
'id'
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entities successfully updated. |
vector_search(query_vector, vector_column='embedding', limit=10)
¶
Perform vector similarity search using VectorChord's cosine distance.
Uses raw SQL with VectorChord's <=> operator for cosine distance. This approach avoids SQLAlchemy type processing issues with Vector objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_vector
|
list[float]
|
The query embedding vector as a plain Python list of floats. |
required |
vector_column
|
str
|
Name of the vector column to search. |
'embedding'
|
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[T]
|
List of entities ordered by similarity (most similar first). |
Note
Requires VectorChord extension and vchordrq index on the embedding column. Example index: CREATE INDEX ON table USING vchordrq (embedding vector_cosine_ops);
vector_search_with_scores(query_vector, vector_column='embedding', limit=10)
¶
Perform vector similarity search using VectorChord's cosine distance.
Uses raw SQL with VectorChord's <=> operator for cosine distance. This approach avoids SQLAlchemy type processing issues with Vector objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_vector
|
list[float]
|
The query embedding vector as a plain Python list of floats. |
required |
vector_column
|
str
|
Name of the vector column to search. |
'embedding'
|
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[tuple[T, float]]
|
List of tuples (entity, distance_score) ordered by similarity. |
list[tuple[T, float]]
|
Lower distance scores indicate higher similarity. |
list[tuple[T, float]]
|
The score is the cosine distance, which is calculated as (1 - cosine_similarity). |
list[tuple[T, float]]
|
0 means identical, 2 means opposite, and 1 means orthogonal. |
Note
Requires VectorChord extension and vchordrq index on the embedding column. Example index: CREATE INDEX ON table USING vchordrq (embedding vector_cosine_ops);
ChunkRepository
¶
Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]
Repository for Chunk entity with vector search capabilities.
__init__(session, model_cls=None)
¶
Initialize chunk repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Chunk model class to use. If None, uses default schema. |
None
|
bm25_search(query_text, index_name='idx_chunk_bm25', limit=10, tokenizer='bert')
¶
Perform VectorChord BM25 search.
Uses VectorChord-BM25's <&> operator for full-text search. Returns entities with their BM25 scores (converted to positive values).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_text
|
str
|
The query text to search for. |
required |
index_name
|
str
|
Name of the BM25 index (default: "idx_chunk_bm25"). |
'idx_chunk_bm25'
|
limit
|
int
|
Maximum number of results to return. |
10
|
tokenizer
|
str
|
Tokenizer to use for query (default: "bert"). Available tokenizers (pg_tokenizer pre-built models): - "bert": bert-base-uncased (Hugging Face) - Default - "wiki_tocken": Wikitext-103 trained model - "gemma2b": Google lightweight model (~100MB memory) - "llmlingua2": Microsoft summarization model (~200MB memory) See: https://github.com/tensorchord/pg_tokenizer.rs/blob/main/docs/06-model.md |
'bert'
|
Returns:
| Type | Description |
|---|---|
list[tuple[Any, float]]
|
List of tuples (entity, score) ordered by relevance. |
list[tuple[Any, float]]
|
Higher scores indicate higher relevance. |
Note
BM25 scores from VectorChord are negative (more negative = more relevant). This method negates the scores so higher = more relevant.
get_by_contents_exact(contents)
¶
Retrieve chunks with exact contents match.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
contents
|
str
|
The exact contents to search for. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of chunks with matching contents. |
get_by_table_type(table_type)
¶
Retrieve chunks with a specific table_type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_type
|
str
|
The table format type (e.g., 'markdown', 'xml', 'html'). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of chunks with the specified table_type. |
get_chunks_with_empty_content(limit=None)
¶
Retrieve chunks that have empty or whitespace-only contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results to return. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of chunks with empty content. |
get_non_table_chunks()
¶
Retrieve chunks that are not tables (is_table=False).
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of chunks where is_table is False. |
get_table_chunks()
¶
Retrieve chunks that are tables (is_table=True).
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of chunks where is_table is True. |
get_with_all_relations(chunk_id)
¶
Retrieve a chunk with all relationships eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
int | str
|
The chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The chunk with all relations loaded, None if not found. |
get_with_chunk_retrieved_results(chunk_id)
¶
Retrieve a chunk with its chunk retrieved results eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
int | str
|
The chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The chunk with chunk retrieved results loaded, None if not found. |
get_with_page_chunk_relations(chunk_id)
¶
Retrieve a chunk with its page-chunk relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
int | str
|
The chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The chunk with page-chunk relations loaded, None if not found. |
get_with_retrieval_relations(chunk_id)
¶
Retrieve a chunk with its retrieval relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
int | str
|
The chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The chunk with retrieval relations loaded, None if not found. |
search_by_contents(search_text)
¶
Search chunks by contents using SQL LIKE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
search_text
|
str
|
The text to search for (use % as wildcard). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of matching chunks. |
ChunkRetrievedResultRepository
¶
Bases: BaseRetrievedResultRepository
Repository for ChunkRetrievedResult entity.
__init__(session, model_cls=None)
¶
Initialize chunk retrieved result repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The ChunkRetrievedResult model class to use. If None, uses default schema. |
None
|
DocumentRepository
¶
Bases: GenericRepository
Repository for Document entity with specialized queries.
__init__(session, model_cls=None)
¶
Initialize document repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Document model class. If None, uses default schema. |
None
|
count_pages(document_id)
¶
Count the number of pages in a document.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of pages in the document. |
get_all_with_pages(limit=None, offset=None)
¶
Retrieve all documents with their pages eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results to return. |
None
|
offset
|
int | None
|
Number of results to skip. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of documents with pages loaded. |
get_by_author(author)
¶
Retrieve all documents by a specific author.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
author
|
str
|
The author name to search for. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of documents by the author. |
get_by_filename(filename)
¶
Retrieve a document by its filename.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
str
|
The filename to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The document if found, None otherwise. |
get_by_path_id(path_id)
¶
Retrieve a document by its file path ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path_id
|
int | str
|
The file ID to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The document if found, None otherwise. |
get_by_title(title)
¶
Retrieve a document by its title.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
title
|
str
|
The title to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The document if found, None otherwise. |
get_with_file(document_id)
¶
Retrieve a document with its file eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The document with file loaded, None if not found. |
get_with_pages(document_id)
¶
Retrieve a document with its pages eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The document with pages loaded, None if not found. |
search_by_metadata(metadata_key, metadata_value)
¶
Search documents by metadata field.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadata_key
|
str
|
The key in the JSONB metadata field. |
required |
metadata_value
|
str
|
The value to search for. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of matching documents. |
ExecutorResultRepository
¶
Bases: GenericRepository[Any]
Repository for ExecutorResult entity with composite key support.
__init__(session, model_cls=None)
¶
Initialize executor result repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The ExecutorResult model class to use. If None, uses default schema. |
None
|
delete_by_composite_key(query_id, pipeline_id)
¶
Delete an executor result by its composite primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the result was deleted, False if not found. |
delete_by_pipeline(pipeline_id)
¶
Delete all executor results for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted records. |
exists_by_composite_key(query_id, pipeline_id)
¶
Check if an executor result exists with the given composite key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the result exists, False otherwise. |
get_by_composite_key(query_id, pipeline_id)
¶
Retrieve an executor result by its composite primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The executor result if found, None otherwise. |
get_by_execution_time_range(pipeline_id, min_time, max_time)
¶
Retrieve executor results within an execution time range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
min_time
|
int
|
Minimum execution time (inclusive). |
required |
max_time
|
int
|
Maximum execution time (inclusive). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of executor results within the specified range. |
get_by_pipeline_id(pipeline_id)
¶
Retrieve all executor results for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of executor results for the pipeline. |
get_by_queries_and_pipeline(query_ids, pipeline_id)
¶
Retrieve executor results for multiple queries under a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_ids
|
list[int | str]
|
List of query IDs. |
required |
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns: List of executor results matching the criteria.
get_by_query_id(query_id)
¶
Retrieve all executor results for a specific query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of executor results for the query. |
get_with_all_relations(query_id, pipeline_id)
¶
Retrieve an executor result with all relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The executor result with all relations loaded, None if not found. |
FileRepository
¶
Bases: GenericRepository
Repository for File entity with specialized queries.
__init__(session, model_cls=None)
¶
Initialize file repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The File model class. If None, uses default schema. |
None
|
count_by_type(file_type)
¶
Count the number of files of a specific type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_type
|
str
|
The file type to count. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of files of the specified type. |
get_all_by_type(file_type, limit=None, offset=None)
¶
Retrieve all files of a specific type with pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_type
|
str
|
The file type to filter by. |
required |
limit
|
int | None
|
Maximum number of results to return. |
None
|
offset
|
int | None
|
Number of results to skip. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of files of the specified type. |
get_all_types()
¶
Get all unique file types in the database.
Returns:
| Type | Description |
|---|---|
list[ColumnElement[Any]]
|
List of unique file types. |
get_by_path(path)
¶
Retrieve a file by its path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The file path to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The file if found, None otherwise. |
get_by_type(file_type)
¶
Retrieve all files of a specific type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_type
|
str
|
The file type (raw, image, audio, video). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of files of the specified type. |
get_with_documents(file_id)
¶
Retrieve a file with its documents eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_id
|
int | str
|
The file ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The file with documents loaded, None if not found. |
search_by_path_pattern(pattern)
¶
Search files by path pattern using SQL LIKE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
The pattern to search for (use % as wildcard). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of matching files. |
GenericRepository
¶
Bases: Generic[T]
Generic repository implementing common CRUD operations.
This base class provides reusable database operations that can be extended by specific repositories for custom business logic.
__init__(session, model_cls)
¶
Initialize repository with a session and model class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type[T]
|
The SQLAlchemy model class this repository manages. |
required |
add(entity)
¶
Add a new entity to the session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
T
|
The entity instance to add. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The added entity. |
add_all(entities)
¶
Add multiple entities to the session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entities
|
list[T]
|
List of entity instances to add. |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
The added entities. |
add_bulk(items)
¶
Memory-efficient bulk insert using SQLAlchemy Core.
Unlike add_all(), this method does not create ORM objects in Python memory. Instead, it uses SQLAlchemy Core's insert() which generates a single multi-row INSERT statement, significantly reducing memory usage and improving performance for large batch inserts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
list[dict]
|
List of dictionaries representing records to insert. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of inserted IDs. |
Note
For 1000 records, this method uses ~3-5x less memory than add_all() because it bypasses ORM object creation and identity map tracking. String values are automatically sanitized to remove NUL bytes for PostgreSQL compatibility.
add_bulk_skip_duplicates(items)
¶
Memory-efficient bulk insert that skips rows with duplicate primary keys.
Uses PostgreSQL's INSERT ... ON CONFLICT DO NOTHING to silently skip conflicting rows in a single SQL statement. This is useful when ingesting datasets that may contain duplicate primary keys (e.g., RAGBench).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
list[dict]
|
List of dictionaries representing records to insert. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of inserted IDs (excludes skipped duplicates). |
Note
String values are automatically sanitized to remove NUL bytes for PostgreSQL compatibility.
count()
¶
Count total number of entities.
Returns:
| Type | Description |
|---|---|
int
|
Total count of entities. |
delete(entity)
¶
Delete an entity from the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
T
|
The entity instance to delete. |
required |
delete_by_id(_id)
¶
Delete an entity by its primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
_id
|
Any
|
The primary key value. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if entity was deleted, False if not found. |
exists(_id)
¶
Check if an entity exists by its primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
_id
|
Any
|
The primary key value. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if entity exists, False otherwise. |
get_all(limit=None, offset=None)
¶
Retrieve all entities of this type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results to return. |
None
|
offset
|
int | None
|
Number of results to skip. |
None
|
Returns:
| Type | Description |
|---|---|
list[T]
|
List of all entities. |
get_by_id(_id)
¶
Retrieve an entity by its primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
_id
|
Any
|
The primary key value. |
required |
Returns:
| Type | Description |
|---|---|
T | None
|
The entity if found, None otherwise. |
get_by_ids(ids)
¶
Retrieve multiple entities by their primary keys.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[Any]
|
List of primary key values. |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
List of entities found (may be fewer than requested if some don't exist). |
update(entity)
¶
Update an existing entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
T
|
The entity instance to update. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The updated entity. |
ImageChunkRepository
¶
Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]
Repository for ImageChunk entity with vector search capabilities.
__init__(session, model_cls=None)
¶
Initialize image chunk repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The ImageChunk model class to use. If None, uses default schema. |
None
|
count_by_page(page_id)
¶
Count the number of image chunks for a specific page.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of image chunks for the page. |
get_by_page_id(page_id)
¶
Retrieve all image chunks for a specific page.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of image chunks belonging to the page. |
get_with_all_relations(image_chunk_id)
¶
Retrieve an image chunk with all relationships eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_id
|
int | str
|
The image chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The image chunk with all relations loaded, None if not found. |
get_with_image_chunk_retrieved_results(image_chunk_id)
¶
Retrieve an image chunk with its image chunk retrieved results eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_id
|
int | str
|
The image chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The image chunk with image chunk retrieved results loaded, None if not found. |
get_with_page(image_chunk_id)
¶
Retrieve an image chunk with its page eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_id
|
int | str
|
The image chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The image chunk with page loaded, None if not found. |
get_with_retrieval_relations(image_chunk_id)
¶
Retrieve an image chunk with its retrieval relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_id
|
int | str
|
The image chunk ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The image chunk with retrieval relations loaded, None if not found. |
MetricRepository
¶
Bases: GenericRepository[Any]
Repository for Metric entity with relationship loading capabilities.
__init__(session, model_cls=None)
¶
Initialize metric repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Metric model class to use. If None, uses default schema. |
None
|
exists_by_name(name)
¶
Check if a metric exists with the given name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The metric name to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if a metric exists, False otherwise. |
exists_by_name_and_type(name, metric_type)
¶
Check if a metric exists with the given name and type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The metric name to check. |
required |
metric_type
|
str
|
The metric type (retrieval or generation). |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if a metric exists, False otherwise. |
get_all_generation_metrics()
¶
Retrieve all generation metrics.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of all generation metrics ordered by name. |
get_all_retrieval_metrics()
¶
Retrieve all retrieval metrics.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of all retrieval metrics ordered by name. |
get_by_name(name)
¶
Retrieve a metric by its name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The metric name to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The metric if found, None otherwise. |
get_by_name_and_type(name, metric_type)
¶
Retrieve a metric by its name and type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The metric name to search for. |
required |
metric_type
|
str
|
The metric type (retrieval or generation). |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The metric if found, None otherwise. |
get_by_type(metric_type)
¶
Retrieve all metrics of a specific type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_type
|
str
|
The metric type (retrieval or generation). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of metrics of the specified type. |
get_with_all_relations(metric_id)
¶
Retrieve a metric with all relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int | str
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The metric with all relations loaded, None if not found. |
get_with_summaries(metric_id)
¶
Retrieve a metric with its summaries eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int | str
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The metric with summaries loaded, None if not found. |
search_by_name(search_text, limit=10)
¶
Search metrics containing the specified text in their name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
search_text
|
str
|
Text to search for in metric names. |
required |
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of metrics containing the search text. |
PageRepository
¶
Bases: GenericRepository
Repository for Page entity with specialized queries.
__init__(session, model_cls=None)
¶
Initialize page repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Page model class. If None, uses default schema. |
None
|
count_by_document(document_id)
¶
Count the number of pages in a document.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of pages in the document. |
get_all_with_document(limit=None, offset=None)
¶
Retrieve all pages with their documents eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results to return. |
None
|
offset
|
int | None
|
Number of results to skip. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of pages with documents loaded. |
get_by_document_and_page_num(document_id, page_num)
¶
Retrieve a specific page by document ID and page number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
page_num
|
int
|
The page number. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The page if found, None otherwise. |
get_by_document_id(document_id)
¶
Retrieve all pages for a specific document.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of pages belonging to the document. |
get_page_range(document_id, start_page, end_page)
¶
Retrieve a range of pages from a document.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
int | str
|
The document ID. |
required |
start_page
|
int
|
The starting page number (inclusive). |
required |
end_page
|
int
|
The ending page number (inclusive). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of pages in the specified range. |
get_with_chunks(page_id)
¶
Retrieve a page with its chunks eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The page with chunks loaded, None if not found. |
get_with_document(page_id)
¶
Retrieve a page with its document eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The page with document loaded, None if not found. |
get_with_image_chunks(page_id)
¶
Retrieve a page with its image chunks eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The page with image chunks loaded, None if not found. |
get_with_page_chunk_relations(page_id)
¶
Retrieve a page with its page-chunk relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page_id
|
int | str
|
The page ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The page with page-chunk relations loaded, None if not found. |
search_by_metadata(metadata_key, metadata_value)
¶
Search pages by metadata field.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadata_key
|
str
|
The key in the JSONB metadata field. |
required |
metadata_value
|
str
|
The value to search for. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of matching pages. |
PipelineRepository
¶
Bases: GenericRepository[Any]
Repository for Pipeline entity with relationship loading capabilities.
__init__(session, model_cls=None)
¶
Initialize pipeline repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Pipeline model class to use. If None, uses default schema. |
None
|
exists_by_name(name)
¶
Check if a pipeline with the given name exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The pipeline name to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if pipeline exists, False otherwise. |
get_all_ordered_by_name()
¶
Retrieve all pipelines ordered by name.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of all pipelines ordered alphabetically by name. |
get_by_config_key(key, value)
¶
Retrieve pipelines with a specific config key-value pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The config key to search for. |
required |
value
|
str | int | float | bool
|
The config value to match. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of pipelines with matching config. |
Note
Uses JSONB containment operator (@>) for efficient config searching.
get_by_name(name)
¶
Retrieve a pipeline by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The pipeline name. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The pipeline if found, None otherwise. |
get_with_all_relations(pipeline_id)
¶
Retrieve a pipeline with all relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The pipeline with all relations loaded, None if not found. |
get_with_executor_results(pipeline_id)
¶
Retrieve a pipeline with its executor results eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The pipeline with executor results loaded, None if not found. |
get_with_retrieved_results(pipeline_id)
¶
Retrieve a pipeline with its retrieved results eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The pipeline with chunk and image chunk retrieved results loaded, None if not found. |
get_with_summaries(pipeline_id)
¶
Retrieve a pipeline with its summaries eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The pipeline with summaries loaded, None if not found. |
search_by_name(name_pattern)
¶
Search pipelines by name pattern (case-insensitive).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name_pattern
|
str
|
The name pattern to search for (supports SQL LIKE wildcards). |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of pipelines matching the pattern. |
QueryRepository
¶
Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]
Repository for Query entity with relationship loading and vector search capabilities.
__init__(session, model_cls=None)
¶
Initialize query repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
model_cls
|
type | None
|
The Query model class to use. If None, uses default schema. |
None
|
count_all()
¶
Count total number of queries.
Returns:
| Type | Description |
|---|---|
int
|
Total count of queries. |
count_by_generation_gt_size(size)
¶
Count queries with a specific number of generation ground truths.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
size
|
int
|
The number of ground truths to match. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Count of queries with the specified number of ground truths. |
find_by_contents(contents)
¶
Find query by exact text content match.
If multiple queries have the same content, returns the first one found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
contents
|
str
|
The exact query text content to find. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The first matching query if found, None otherwise. |
get_all_ids(limit, offset=0)
¶
Get all query IDs with pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of query IDs to return. |
required |
offset
|
int
|
Number of query IDs to skip. |
0
|
Returns:
| Type | Description |
|---|---|
list[int | str]
|
List of query IDs ordered by ID. |
get_by_query_text(query_text)
¶
Retrieve a query by its text content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_text
|
str
|
The query text to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The query if found, None otherwise. |
get_queries_with_empty_content(limit=100)
¶
Retrieve queries with empty or whitespace-only content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of queries to retrieve. |
100
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of queries with empty content. |
get_queries_with_generation_gt()
¶
Retrieve all queries that have generation ground truth.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of queries with generation ground truth. |
get_with_all_relations(query_id)
¶
Retrieve a query with all relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The query with all relations loaded, None if not found. |
get_with_executor_results(query_id)
¶
Retrieve a query with its executor results eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The query with executor results loaded, None if not found. |
get_with_retrieval_relations(query_id)
¶
Retrieve a query with its retrieval relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The query with retrieval relations loaded, None if not found. |
search_by_query_text(search_text, limit=10)
¶
Search queries containing the specified text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
search_text
|
str
|
Text to search for in query content. |
required |
limit
|
int
|
Maximum number of results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of queries containing the search text. |
SummaryRepository
¶
Bases: GenericRepository[Summary]
Repository for Summary entity with composite key support.
__init__(session)
¶
Initialize summary repository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
SQLAlchemy session for database operations. |
required |
compare_pipelines_by_metric(pipeline_ids, metric_id)
¶
Compare multiple pipelines on a specific metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_ids
|
list[int]
|
List of pipeline IDs to compare. |
required |
metric_id
|
int
|
The metric ID to compare on. |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries for the specified pipelines and metric, ordered by metric result. |
delete_by_composite_key(pipeline_id, metric_id)
¶
Delete a summary by its composite primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the summary was deleted, False if not found. |
exists_by_composite_key(pipeline_id, metric_id)
¶
Check if a summary exists with the given composite key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the summary exists, False otherwise. |
get_by_composite_key(pipeline_id, metric_id)
¶
Retrieve a summary by its composite primary key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
Summary | None
|
The summary if found, None otherwise. |
get_by_execution_time_range(pipeline_id, min_time, max_time)
¶
Retrieve summaries within an execution time range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
min_time
|
int
|
Minimum execution time (inclusive). |
required |
max_time
|
int
|
Maximum execution time (inclusive). |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries within the specified time range. |
get_by_metric_id(metric_id)
¶
Retrieve all summaries for a specific metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries for the metric. |
get_by_metric_result_range(metric_id, min_result, max_result)
¶
Retrieve summaries within a metric result range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int
|
The metric ID. |
required |
min_result
|
float
|
Minimum metric result value (inclusive). |
required |
max_result
|
float
|
Maximum metric result value (inclusive). |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries within the specified range. |
get_by_pipeline_id(pipeline_id)
¶
Retrieve all summaries for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries for the pipeline. |
get_metric_summaries_with_relations(metric_id)
¶
Retrieve all summaries for a metric with relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries with pipeline and metric loaded. |
get_pipeline_summaries_with_relations(pipeline_id)
¶
Retrieve all summaries for a pipeline with relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries with pipeline and metric loaded. |
get_top_pipelines_by_metric(metric_id, limit=10, ascending=False)
¶
Retrieve top performing pipelines for a specific metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int
|
The metric ID. |
required |
limit
|
int
|
Maximum number of results to return. |
10
|
ascending
|
bool
|
If True, sort ascending (lower is better), otherwise descending (higher is better). |
False
|
Returns:
| Type | Description |
|---|---|
list[Summary]
|
List of summaries ordered by metric result. |
get_with_all_relations(pipeline_id, metric_id)
¶
Retrieve a summary with all relations eagerly loaded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int
|
The pipeline ID. |
required |
metric_id
|
int
|
The metric ID. |
required |
Returns:
| Type | Description |
|---|---|
Summary | None
|
The summary with all relations loaded, None if not found. |
TextOnlyUnitOfWork
¶
Bases: BaseUnitOfWork
Text-only Unit of Work for managing text data ingestion transactions.
This UoW focuses on text-based entities only (Query, Chunk, RetrievalRelation) and excludes image-related tables like ImageChunk.
Provides lazy-initialized repositories for efficient resource usage.
chunks
property
¶
Get the Chunk repository.
Returns:
| Type | Description |
|---|---|
ChunkRepository
|
ChunkRepository instance. |
Raises:
| Type | Description |
|---|---|
SessionNotSetError
|
If session is not initialized. |
queries
property
¶
Get the Query repository.
Returns:
| Type | Description |
|---|---|
QueryRepository
|
QueryRepository instance. |
Raises:
| Type | Description |
|---|---|
SessionNotSetError
|
If session is not initialized. |
retrieval_relations
property
¶
Get the RetrievalRelation repository.
Returns:
| Type | Description |
|---|---|
RetrievalRelationRepository
|
RetrievalRelationRepository instance. |
Raises:
| Type | Description |
|---|---|
SessionNotSetError
|
If session is not initialized. |
__init__(session_factory, schema=None)
¶
Initialize Text-only Unit of Work with a session factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker
|
SQLAlchemy sessionmaker instance. |
required |
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default 768-dim schema. |
None
|
UnitOfWork
¶
Unit of Work pattern for managing database transactions.
Ensures data consistency by grouping multiple repository operations into a single atomic transaction.
__enter__()
¶
Enter the context manager and create a new session.
Returns:
| Type | Description |
|---|---|
UnitOfWork
|
Self for method chaining. |
__exit__(exc_type, exc_val, exc_tb)
¶
Exit the context manager and clean up session.
Automatically rolls back if an exception occurred.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc_type
|
Any
|
Exception type if an error occurred. |
required |
exc_val
|
Any
|
Exception value if an error occurred. |
required |
exc_tb
|
Any
|
Exception traceback if an error occurred. |
required |
__init__(session_factory)
¶
Initialize Unit of Work with a session factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
Any
|
SQLAlchemy sessionmaker instance. |
required |
commit()
¶
Commit the current transaction.
flush()
¶
Flush pending changes without committing.
rollback()
¶
Rollback the current transaction.
Service¶
autorag_research.orm.service
¶
BaseEvaluationService
¶
Bases: BaseService, ABC
Abstract base class for evaluation services.
Provides common patterns for evaluation workflows: 1. Fetch execution results in batches using Generator (abstract) 2. Filter missing query IDs that need evaluation (abstract) 3. Compute metrics with batch processing (base) 4. Save evaluation results (abstract)
The service supports: - Setting and changing metric functions dynamically - Batch processing with configurable batch size - Generator-based pagination to minimize memory usage and transaction issues
Example
service = RetrievalEvaluationService(session_factory, schema)
# Set metric and evaluate
service.set_metric(metric_id=1, metric_func=my_metric_func)
service.evaluate(pipeline_id=1, batch_size=100)
# Change metric and evaluate again
service.set_metric(metric_id=2, metric_func=another_metric_func)
service.evaluate(pipeline_id=1, batch_size=100)
metric_func
property
¶
Get current metric function.
metric_id
property
¶
Get current metric ID.
__init__(session_factory, schema=None)
¶
Initialize the evaluation service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_factory
|
sessionmaker[Session]
|
SQLAlchemy sessionmaker for database connections. |
required |
schema
|
Any | None
|
Schema namespace from create_schema(). If None, uses default 768-dim schema. |
None
|
evaluate(pipeline_id, batch_size=100)
¶
Run the full evaluation pipeline for the current metric.
This method uses Generator-based pagination to process query IDs: 1. Iterates through query ID batches using limit/offset 2. Filters to only those missing evaluation results 3. Fetches execution results for the batch 4. Computes metrics in batch 5. Saves results to database
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID to evaluate. |
required |
batch_size
|
int
|
Number of queries to process per batch. |
100
|
Returns:
| Type | Description |
|---|---|
int
|
Tuple of (queries_evaluated, average_score). |
float | None
|
average_score is None if no queries were evaluated. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If metric is not set. |
get_metric(metric_name, metric_type=None)
¶
Get metric by name and optionally type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_name
|
str
|
The name of the metric. |
required |
metric_type
|
str | None
|
Optional metric type filter ('retrieval' or 'generation'). |
None
|
Returns:
| Type | Description |
|---|---|
Any | None
|
The Metric entity if found, None otherwise. |
get_or_create_metric(name, metric_type)
¶
Get existing metric or create a new one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The metric name. |
required |
metric_type
|
str
|
The metric type ('retrieval' or 'generation'). |
required |
Returns:
| Type | Description |
|---|---|
int | str
|
The metric ID. |
is_evaluation_complete(pipeline_id, metric_id, batch_size=100)
¶
Check if evaluation is complete for all queries.
Iterates through all query IDs and checks: 1. Each query has execution results 2. Each query has evaluation results for the given pipeline and metric
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID to check. |
required |
metric_id
|
int | str
|
The metric ID to check. |
required |
batch_size
|
int
|
Number of queries to check per batch. |
100
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all queries have both execution and evaluation results, |
bool
|
False otherwise (returns immediately on first missing). |
set_metric(metric_id, metric_func, compute_granularity='query')
¶
Set the metric ID and function for evaluation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metric_id
|
int | str
|
The ID of the metric in the database. |
required |
metric_func
|
MetricFunc
|
Function that takes list[MetricInput] and returns list[float | None]. |
required |
compute_granularity
|
Literal['query', 'dataset']
|
Metric compute granularity ("query" or "dataset"). |
'query'
|
verify_pipeline_completion(pipeline_id, batch_size=100)
¶
Verify all queries have execution results for the pipeline.
Iterates through query IDs in batches and checks each batch has results. Returns False immediately when any query is missing results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
The pipeline ID to verify. |
required |
batch_size
|
int
|
Number of queries to check per batch. |
100
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all queries have results, False otherwise. |
Raises:
| Type | Description |
|---|---|
NoQueryInDBError
|
If no queries exist in the database. |
BasePipelineService
¶
Bases: BaseService, ABC
Abstract base for pipeline services with shared pipeline management.
Provides: - get_or_create_pipeline(): Idempotent pipeline creation with resume support - get_pipeline_config(): Pipeline config retrieval by ID
Subclasses must implement _create_uow() and _get_schema_classes() as required by BaseService.
The UoW returned by _create_uow() must expose a pipelines property (PipelineRepository).
get_or_create_pipeline(name, config, *, strict=False)
¶
Get existing pipeline by name or create a new one.
If a pipeline with the given name already exists, returns its ID.
If the existing pipeline has a different config, behavior depends on strict:
- strict=False (default): logs a warning and reuses the existing pipeline.
- strict=True: raises ValueError.
If no pipeline exists, creates a new one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this pipeline (used as experiment identifier). |
required |
config
|
dict
|
Configuration dictionary for the pipeline. |
required |
strict
|
bool
|
When True, raise ValueError on config mismatch instead of warning. |
False
|
Returns:
| Type | Description |
|---|---|
tuple[int | str, bool]
|
Tuple of (pipeline_id, is_new) where is_new is True if a new pipeline was created. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If strict=True and an existing pipeline has a different config. |
get_pipeline_config(pipeline_id)
¶
Get pipeline configuration by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
ID of the pipeline. |
required |
Returns:
| Type | Description |
|---|---|
dict[Any, Any] | None
|
Pipeline config dict if found, None otherwise. |
GenerationEvaluationService
¶
Bases: BaseEvaluationService
Service for evaluating generation pipelines.
This service handles the evaluation workflow for generation pipelines: 1. Fetch queries and ground truth (Query.generation_gt) 2. Fetch generation results (ExecutorResult.generation_result) 3. Compute evaluation metrics (e.g., BLEU, ROUGE, F1) 4. Store results in EvaluationResult table
The service uses MetricInput to pass data to metric functions, which should accept list[MetricInput] and return list[float | None].
Example
from autorag_research.orm.service import GenerationEvaluationService
# Create service
service = GenerationEvaluationService(session_factory, schema)
# Get or create metric
metric_id = service.get_or_create_metric("bleu", "generation")
# Set metric and evaluate
service.set_metric(metric_id=metric_id, metric_func=bleu_score)
count, avg = service.evaluate(pipeline_id=1, batch_size=100)
print(f"Evaluated {count} queries, average={avg}")
GenerationPipelineService
¶
Bases: BasePipelineService
Service for running generation pipelines.
This service handles the common workflow for all generation pipelines: 1. Create a pipeline instance 2. Fetch queries from database 3. Run generation using the provided function (which handles retrieval internally) 4. Store results in ExecutorResult table
The actual generation logic (including retrieval) is provided as a function parameter, making this service reusable for NaiveRAG, iterative RAG, etc.
Example
from autorag_research.orm.service.generation_pipeline import GenerationPipelineService
# Create service
service = GenerationPipelineService(session_factory, schema)
# Create or resume pipeline
pipeline_id, is_new = service.get_or_create_pipeline(
name="naive_rag_v1",
config={"type": "naive_rag", "llm_model": "gpt-4"},
)
# Run pipeline with async generation function
results = service.run_pipeline(
generate_func=my_async_generate_func, # Async: handles retrieval + generation
pipeline_id=pipeline_id,
top_k=10,
)
delete_pipeline_results(pipeline_id)
¶
Delete all generation results for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
ID of the pipeline. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted records. |
get_chunk_contents(chunk_ids)
¶
Get chunk contents by IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_ids
|
list[int | str]
|
List of chunk IDs to fetch. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
List of chunk content strings in the same order as input IDs. |
get_image_chunk_contents(image_chunk_ids)
¶
Fetch image chunk contents (bytes, mimetype) by IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_ids
|
list[int | str]
|
List of image chunk IDs. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[bytes, str]]
|
List of (image_bytes, mimetype) tuples in same order as IDs. |
list[tuple[bytes, str]]
|
Missing chunks return (b"", "image/png"). |
get_query_text(query_id)
¶
Get the text of a query by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The ID of the query. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The text of the query. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the query with the given ID is not found. |
run_pipeline(generate_func, pipeline_id, top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)
¶
Run generation pipeline for all queries with parallel execution and retry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generate_func
|
GenerateFunc
|
Async function that performs retrieval + generation. Signature: async (query_id: int, top_k: int) -> GenerationResult The function should handle retrieval internally. |
required |
pipeline_id
|
int | str
|
ID of the pipeline. |
required |
top_k
|
int
|
Number of top documents to retrieve per query. |
10
|
batch_size
|
int
|
Number of queries to fetch from DB at once. |
128
|
max_concurrency
|
int
|
Maximum number of concurrent async operations. |
16
|
max_retries
|
int
|
Maximum number of retry attempts for failed queries. |
3
|
retry_delay
|
float
|
Base delay in seconds for exponential backoff between retries. |
1.0
|
query_limit
|
int | None
|
Maximum number of queries to process. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with pipeline execution statistics: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
MultiModalIngestionService
¶
Bases: BaseIngestionService
Service for multi-modal data ingestion operations.
This service provides batch-only methods for ingesting multi-modal RAG datasets. Users can access repositories directly via UoW for basic CRUD operations.
Design Principles: - Batch-only methods (no single-add methods) - No simple wrappers around repository functions - Value-added operations with transaction management and validation - Mixed multi-hop support for retrieval ground truth
Example
from autorag_research.orm.connection import DBConnection
from autorag_research.orm.service import MultiModalIngestionService
# Setup database connection
db = DBConnection.from_config() # or DBConnection.from_env()
session_factory = db.get_session_factory()
# Initialize service
service = MultiModalIngestionService(session_factory)
# Read image file as bytes
with open("/path/to/image1.jpg", "rb") as f:
image_bytes = f.read()
# Batch add files
file_ids = service.add_files([
{"path": "/path/to/image1.jpg", "file_type": "image"},
{"path": "/path/to/document1.pdf", "file_type": "raw"},
])
add_documents(documents)
¶
Batch add documents to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
documents
|
list[dict]
|
List of dicts with keys: - filename (str | None) - title (str | None) - author (str | None) - filepath_id (int | None) - FK to File - metadata (dict | None) - JSONB metadata |
required |
Returns:
| Type | Description |
|---|---|
list[int | str]
|
List of created Document IDs. |
add_files(files)
¶
Batch add files to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
files
|
list[dict[str, str]]
|
List of dictionary (path, file_type). file_type can be: "raw", "image", "audio", "video". |
required |
Returns:
| Type | Description |
|---|---|
list[int | str]
|
List of created File IDs. |
add_image_chunks(image_chunks)
¶
Batch add image chunks to the database.
Uses memory-efficient bulk insert (SQLAlchemy Core) instead of ORM objects. This reduces memory usage by ~3-5x for large batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunks
|
list[dict[str, bytes | str | int | None]]
|
List of dictionary (content, mimetype, parent_page_id). content: Image binary data (required) mimetype: Image MIME type e.g., "image/png" (required) parent_page_id: FK to Page (optional) |
required |
Returns:
| Type | Description |
|---|---|
list[int | str]
|
List of created ImageChunk IDs. |
add_pages(pages)
¶
Batch add pages to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pages
|
list[dict]
|
List of dicts with keys: - document_id (int) - FK to Document (required) - page_num (int) - Page number (required) - image_content (bytes | None) - Image binary data - mimetype (str | None) - Image MIME type (e.g., "image/png") - metadata (dict | None) - JSONB metadata |
required |
Returns:
| Type | Description |
|---|---|
list[int | str]
|
List of created Page IDs. |
embed_all_image_chunks(embed_func, batch_size=100, max_concurrency=10)
¶
Embed all image chunks that don't have embeddings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
embed_func
|
ImageEmbeddingFunc
|
Async function that takes image bytes and returns embedding vector. |
required |
batch_size
|
int
|
Number of image chunks to process per batch. |
100
|
max_concurrency
|
int
|
Maximum concurrent embedding calls. |
10
|
Returns:
| Type | Description |
|---|---|
int
|
Total number of image chunks successfully embedded. |
embed_all_image_chunks_multi_vector(embed_func, batch_size=100, max_concurrency=10)
¶
Embed all image chunks that don't have multi-vector embeddings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
embed_func
|
ImageMultiVectorEmbeddingFunc
|
Async function that takes image bytes and returns multi-vector embedding. |
required |
batch_size
|
int
|
Number of image chunks to process per batch. |
100
|
max_concurrency
|
int
|
Maximum concurrent embedding calls. |
10
|
Returns:
| Type | Description |
|---|---|
int
|
Total number of image chunks successfully embedded. |
get_statistics()
¶
Get statistics about the ingested data.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary with counts for all entity types and embedding status. |
set_image_chunk_embeddings(image_chunk_ids, embeddings)
¶
Batch set embeddings for image chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_ids
|
list[int | str]
|
List of image chunk IDs. |
required |
embeddings
|
list[list[float]]
|
List of embedding vectors (must match image_chunk_ids length). |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of image chunks successfully updated. |
Raises:
| Type | Description |
|---|---|
LengthMismatchError
|
If image_chunk_ids and embeddings have different lengths. |
set_image_chunk_multi_embeddings(image_chunk_ids, embeddings)
¶
Batch set multi-vector embeddings for image chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunk_ids
|
list[int | str]
|
List of image chunk IDs. |
required |
embeddings
|
list[list[list[float]]]
|
List of multi-vector embeddings (list of list of floats per image chunk). |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of image chunks successfully updated. |
Raises:
| Type | Description |
|---|---|
LengthMismatchError
|
If image_chunk_ids and embeddings have different lengths. |
RetrievalEvaluationService
¶
Bases: BaseEvaluationService
Service for evaluating retrieval pipelines.
This service handles the evaluation workflow for retrieval pipelines: 1. Fetch queries and ground truth (RetrievalRelation) 2. Fetch retrieval results (ChunkRetrievedResult) 3. Compute evaluation metrics (e.g., Recall@K, Precision@K, MRR) 4. Store results in EvaluationResult table
The service uses MetricInput to pass data to metric functions, which should accept list[MetricInput] and return list[float | None].
Example
from autorag_research.orm.service import RetrievalEvaluationService
from autorag_research.evaluation.metrics.retrieval import retrieval_recall
# Create service
service = RetrievalEvaluationService(session_factory, schema)
# Get or create metric
metric_id = service.get_or_create_metric("recall@10", "retrieval")
# Set metric and evaluate
service.set_metric(metric_id=metric_id, metric_func=retrieval_recall)
count, avg = service.evaluate(pipeline_id=1, batch_size=100)
print(f"Evaluated {count} queries, average={avg}")
# Evaluate another metric
metric_id_2 = service.get_or_create_metric("precision@10", "retrieval")
service.set_metric(metric_id=metric_id_2, metric_func=retrieval_precision)
service.evaluate(pipeline_id=1)
RetrievalPipelineService
¶
Bases: BasePipelineService
Service for running retrieval pipelines.
This service handles the common workflow for all retrieval pipelines: 1. Create a pipeline instance 2. Fetch queries from database 3. Run retrieval using the provided retrieval function 4. Store results in ChunkRetrievedResult table
The actual retrieval logic is provided as a function parameter, making this service reusable for BM25, dense retrieval, hybrid, etc.
Example
from autorag_research.orm.service import RetrievalPipelineService
# Create service
service = RetrievalPipelineService(session_factory, schema)
# Direct search (for single-query use cases)
results = service.bm25_search(query_ids=[1, 2, 3], top_k=10)
results = service.vector_search(query_ids=[1, 2, 3], top_k=10)
# Or use run_pipeline for batch processing with result persistence
pipeline_id, is_new = service.get_or_create_pipeline(
name="bm25",
config={"type": "bm25", "tokenizer": "bert"},
)
stats = service.run_pipeline(
retrieval_func=lambda ids, k: service.bm25_search(ids, k),
pipeline_id=pipeline_id,
top_k=10,
)
bm25_search(query_ids, top_k=10, tokenizer='bert', index_name='idx_chunk_bm25')
¶
Execute BM25 retrieval for given query IDs.
Uses VectorChord-BM25 full-text search on the chunks table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_ids
|
list[int | str]
|
List of query IDs to search for. |
required |
top_k
|
int
|
Number of top results to return per query. |
10
|
tokenizer
|
str
|
Tokenizer to use for BM25 (default: "bert"). |
'bert'
|
index_name
|
str
|
Name of the BM25 index (default: "idx_chunk_bm25"). |
'idx_chunk_bm25'
|
Returns:
| Type | Description |
|---|---|
list[list[dict[str, Any]]]
|
List of result lists, one per query. Each result dict contains: |
list[list[dict[str, Any]]]
|
|
list[list[dict[str, Any]]]
|
|
list[list[dict[str, Any]]]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a query ID is not found in the database. |
bm25_search_by_text(query_text, top_k=10, tokenizer='bert', index_name='idx_chunk_bm25')
¶
Execute BM25 retrieval using raw query text (no Query entity needed).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_text
|
str
|
The query text to search for. |
required |
top_k
|
int
|
Number of top results to return. |
10
|
tokenizer
|
str
|
Tokenizer to use for BM25 (default: "bert"). |
'bert'
|
index_name
|
str
|
Name of the BM25 index (default: "idx_chunk_bm25"). |
'idx_chunk_bm25'
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of result dicts containing doc_id, score, and content. |
delete_pipeline_results(pipeline_id)
¶
Delete all retrieval results for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_id
|
int | str
|
ID of the pipeline. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted records. |
fetch_query_texts(query_ids)
¶
Batch fetch query texts from database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_ids
|
list[int | str]
|
List of query IDs to fetch. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
List of query text contents in the same order as query_ids. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a query ID is not found. |
find_query_by_text(query_text)
¶
Find existing query by text content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_text
|
str
|
The query text to search for. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The query if found, None otherwise. |
run_pipeline(retrieval_func, pipeline_id, top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)
¶
Run retrieval pipeline for all queries with parallel execution and retry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retrieval_func
|
RetrievalFunc
|
Async function that performs retrieval for a single query. Signature: async (query_id: int | str, top_k: int) -> list[dict] Each result dict must have 'doc_id' (int) and 'score' keys. |
required |
pipeline_id
|
int | str
|
ID of the pipeline. |
required |
top_k
|
int
|
Number of top documents to retrieve per query. |
10
|
batch_size
|
int
|
Number of queries to fetch from DB at once. |
128
|
max_concurrency
|
int
|
Maximum number of concurrent async operations. |
16
|
max_retries
|
int
|
Maximum number of retry attempts for failed queries. |
3
|
retry_delay
|
float
|
Base delay in seconds for exponential backoff between retries. |
1.0
|
query_limit
|
int | None
|
Maximum number of queries to process. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with pipeline execution statistics: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
vector_search(query_ids, top_k=10, search_mode='single')
¶
Execute vector search for given query IDs.
Supports single-vector (cosine similarity) and multi-vector (MaxSim) search modes using VectorChord extension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_ids
|
list[int | str]
|
List of query IDs to search for. |
required |
top_k
|
int
|
Number of top results to return per query. |
10
|
search_mode
|
Literal['single', 'multi']
|
"single" for dense retrieval, "multi" for late interaction. |
'single'
|
Returns:
| Type | Description |
|---|---|
list[list[dict[str, Any]]]
|
List of result lists, one per query. Each result dict contains: |
list[list[dict[str, Any]]]
|
|
list[list[dict[str, Any]]]
|
|
list[list[dict[str, Any]]]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a query ID is not found or lacks required embeddings. |
vector_search_by_embedding(embedding, top_k=10)
¶
Execute vector search using a provided embedding directly.
This method enables retrieval pipelines that generate embeddings dynamically (like HyDE) rather than using pre-computed query embeddings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
embedding
|
list[float]
|
The embedding vector to search with. |
required |
top_k
|
int
|
Number of top results to return. |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of result dicts containing doc_id, score, and content. |
list[dict[str, Any]]
|
Score is cosine similarity in [-1, 1] range. |
TextDataIngestionService
¶
Bases: BaseIngestionService
Service for text-only data ingestion operations.
Provides methods for:
- Adding queries (with optional generation_gt)
- Adding chunks (text-only, no parent page required)
- Creating retrieval ground truth relations (with multi-hop support)
- Setting embeddings for queries and chunks (accepts pre-computed vectors)
Example
Basic usage with queries, chunks, and retrieval ground truth:
from autorag_research.orm.connection import DBConnection
from autorag_research.orm.service import TextDataIngestionService
# Setup database connection
db = DBConnection.from_config() # or DBConnection.from_env()
session_factory = db.get_session_factory()
# Initialize service
service = TextDataIngestionService(session_factory)
# Get statistics
stats = service.get_statistics()
print(stats)
clean()
¶
Delete empty queries and chunks along with their associated retrieval relations.
This method should be called after data ingestion and before embedding to remove any queries or chunks with empty or whitespace-only content. It also removes associated retrieval relations to maintain referential integrity.
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
Dictionary with counts of deleted queries and chunks. |
get_retrieval_gt_by_query(query_id)
¶
Get all retrieval ground truth relations for a query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_id
|
int | str
|
The query ID. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of RetrievalRelation entities ordered by group_index and group_order. |
get_statistics()
¶
Get statistics about the ingested data.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary with counts of queries, chunks, and embeddings status. |
Utilities¶
autorag_research.util
¶
TokenUsageTracker
¶
Collects token usage from LangChain LLM responses.
Usage::
tracker = TokenUsageTracker()
tracker.record(response) # extract + store from LangChain response
result = tracker.total # aggregated total across all calls
per_call = tracker.history # per-call breakdown
history
property
¶
Per-call token usage breakdown (defensive copy).
total
property
¶
Aggregated total token usage across all recorded calls.
record(response)
¶
Extract and record token usage from a LangChain LLM response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Any
|
LangChain LLM response object (AIMessage, etc.) |
required |
Returns:
| Type | Description |
|---|---|
dict[str, int] | None
|
Extracted usage dict, or None if not available. |
aggregate_token_usage(current, new)
¶
Aggregate two token usage dicts (accumulator pattern).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current
|
dict[str, int] | None
|
Current aggregated token usage (or None). |
required |
new
|
dict[str, int] | None
|
New token usage to add (or None). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, int] | None
|
Aggregated token usage dict, or None if both inputs are None. |
bytes_to_pil_image(image_bytes)
¶
Convert image bytes to PIL Image.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_bytes
|
bytes
|
Raw image bytes (PNG, JPEG, etc.) |
required |
Returns:
| Type | Description |
|---|---|
Image
|
PIL Image object. |
convert_inputs_to_list(func)
¶
Decorator to convert all function inputs to Python lists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
The function to be decorated. |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
The wrapped function that converts all inputs to lists. |
extract_image_from_data_uri(data_uri)
¶
Extract image bytes and mimetype from a data URI.
extract_token_logprobs(response, target_tokens=None)
¶
Extract log probabilities from LangChain LLM response.
Works with any LangChain LLM that stores logprobs in response_metadata["logprobs"]["content"].
Compatible providers include:
- OpenAI (ChatOpenAI, AzureChatOpenAI)
- Together AI, Fireworks AI, Anyscale
- Local models via vLLM, text-generation-inference, Ollama (with logprobs enabled)
- Any OpenAI-compatible API endpoint
To enable logprobs, use provider-specific configuration: - OpenAI/vLLM: llm.bind(logprobs=True, top_logprobs=5) - Other providers: Check provider documentation for logprobs support
LangChain stores logprobs in response.response_metadata["logprobs"]["content"].
Each token entry has: token, logprob, bytes, top_logprobs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Any
|
LangChain AIMessage or similar response object. |
required |
target_tokens
|
list[str] | None
|
If provided, only return logprobs for these tokens. Case-insensitive matching. If None, returns all token logprobs. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, float] | None
|
Dict mapping token strings to their log probability values. |
dict[str, float] | None
|
Returns None if logprobs not available in response. |
Example
Enable logprobs on the LLM (OpenAI example)¶
llm = ChatOpenAI(model="gpt-4o-mini").bind(logprobs=True, top_logprobs=5) response = llm.invoke("Answer Yes or No: Is the sky blue?") logprobs = extract_token_logprobs(response, target_tokens=["Yes", "No"])
Returns: {"Yes": -0.0001, "No": -9.2} or None if not available¶
Note
- log probability of 0.0 = 100% confidence
- More negative = less likely
- Convert to probability: exp(logprob)
image_chunk_to_pil_images(image_chunks)
¶
Convert raw image bytes to PIL Images, skipping invalid ones.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_chunks
|
list[tuple[bytes, str]]
|
List of (bytes, mimetype) tuples. It can be a result of the GET operation from the ImageChunk repository. |
required |
Returns:
| Type | Description |
|---|---|
list[Image]
|
List of valid PIL Images. |
load_image(img)
¶
Convert any ImageType to a PIL Image in RGB mode.
Accepts file paths (str/Path), raw bytes, or BytesIO objects and returns a PIL Image converted to RGB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
img
|
ImageType
|
Image as file path (str/Path), raw bytes, or BytesIO. |
required |
Returns:
| Type | Description |
|---|---|
Image
|
PIL Image in RGB mode. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If img is not a supported type. |
normalize_dbsf(scores)
¶
3-sigma distribution-based score fusion normalization.
Normalizes using mean ± 3*std as bounds, then clips to [0, 1]. This method is robust to outliers and works well when combining scores from different distributions. None values are preserved and excluded from statistics calculation.
Reference: "Score Normalization in Multi-Engine Text Retrieval"
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scores
|
list[float | None]
|
List of numeric scores to normalize. None values are preserved. |
required |
Returns:
| Type | Description |
|---|---|
list[float | None]
|
List of normalized scores clipped to [0, 1] range, with None preserved. |
Example
normalize_dbsf([1.0, 2.0, 3.0, 4.0, 5.0]) [0.0, 0.25, 0.5, 0.75, 1.0] # approximately normalize_dbsf([1.0, None, 3.0, 4.0, 5.0]) [0.0, None, 0.333..., 0.5, 0.666...] # approximately
normalize_minmax(scores)
¶
Min-max normalization to [0, 1] range.
Scales scores linearly so that the minimum becomes 0 and maximum becomes 1. If all scores are equal, returns a list of 0.5 values. None values are preserved and excluded from statistics calculation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scores
|
list[float | None]
|
List of numeric scores to normalize. None values are preserved. |
required |
Returns:
| Type | Description |
|---|---|
list[float | None]
|
List of normalized scores in [0, 1] range, with None preserved. |
Example
normalize_minmax([1.0, 2.0, 3.0]) [0.0, 0.5, 1.0] normalize_minmax([1.0, None, 3.0]) [0.0, None, 1.0]
normalize_string(s)
¶
Taken from the official evaluation script for v1.1 of the SQuAD dataset. Lower text and remove punctuation, articles, and extra whitespace.
normalize_tmm(scores, theoretical_min)
¶
Theoretical min-max normalization using theoretical min and actual max.
Uses the theoretical minimum bound and actual maximum from the data. This is useful when the minimum is known (e.g., 0 for BM25) but the maximum varies per query. None values are preserved and excluded from statistics calculation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scores
|
list[float | None]
|
List of numeric scores to normalize. None values are preserved. |
required |
theoretical_min
|
float
|
Known minimum possible score (e.g., 0 for BM25, -1 for cosine). |
required |
Returns:
| Type | Description |
|---|---|
list[float | None]
|
List of normalized scores in [0, 1] range, with None preserved. |
Example
normalize_tmm([0.0, 50.0, 100.0], theoretical_min=0.0) [0.0, 0.5, 1.0] normalize_tmm([0.0, None, 100.0], theoretical_min=0.0) [0.0, None, 1.0]
normalize_zscore(scores)
¶
Z-score standardization (mean=0, std=1).
Centers scores around mean and scales by standard deviation. If standard deviation is 0 (all scores equal), returns all zeros. None values are preserved and excluded from statistics calculation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scores
|
list[float | None]
|
List of numeric scores to normalize. None values are preserved. |
required |
Returns:
| Type | Description |
|---|---|
list[float | None]
|
List of z-score normalized values, with None preserved. |
Example
normalize_zscore([1.0, 2.0, 3.0]) [-1.2247..., 0.0, 1.2247...] normalize_zscore([1.0, None, 3.0]) [-1.0, None, 1.0]
pil_image_to_bytes(image)
¶
Convert PIL image to bytes with mimetype.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
PIL Image object. |
required |
Returns:
| Type | Description |
|---|---|
tuple[bytes, str]
|
Tuple of (image_bytes, mimetype). |
pil_image_to_data_uri(image)
¶
Convert PIL Image to data URI for multi-modal LLMs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
PIL Image object. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Data URI string (e.g., "data:image/png;base64,iVBORw0..."). |
run_with_concurrency_limit(items, async_func, max_concurrency, error_message='Task failed')
async
¶
Run async function on items with concurrency limit using semaphore.
A generic utility for running async operations with controlled concurrency.
Each item is processed by the async function, with at most max_concurrency
operations running simultaneously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
Iterable of items to process. |
required |
async_func
|
Callable[[T], Awaitable[R]]
|
Async function that takes an item and returns a result. |
required |
max_concurrency
|
int
|
Maximum number of concurrent operations. |
required |
error_message
|
str
|
Message to log when an operation fails. |
'Task failed'
|
Returns:
| Type | Description |
|---|---|
list[R | None]
|
List of results (or None if failed) in same order as items. |
Example
async def embed_text(text: str) -> list[float]:
return await some_api_call(text)
texts = ["hello", "world", "test"]
embeddings = await run_with_concurrency_limit(
texts,
embed_text,
max_concurrency=5,
error_message="Failed to embed text",
)
to_async_func(func)
¶
Convert a synchronous function to an asynchronous function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., R]
|
The synchronous function to convert. |
required |
Returns:
| Type | Description |
|---|---|
Callable[..., Awaitable[R]]
|
An asynchronous function that runs the synchronous function in a thread. |
to_list(item)
¶
Recursively convert collections to Python lists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any
|
The item to convert to a list. Can be numpy array, pandas Series, or any iterable collection. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The converted Python list. |
truncate_texts(str_list, max_tokens)
¶
Truncate each string in the list to a maximum number of tokens using tiktoken.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
str_list
|
list[str]
|
List of strings to be truncated. |
required |
max_tokens
|
int
|
Maximum number of tokens allowed per string. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
List of truncated strings. |
unpack_and_run(target_list, func, *args, **kwargs)
¶
Unpack each sublist in target_list and run func with the unpacked arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_list
|
list[list[Any]]
|
List of sublists to be unpacked and processed. |
required |
func
|
Callable
|
Callable function to run on the flattened list. |
required |
*args
|
tuple
|
Additional positional arguments to pass to func. |
()
|
**kwargs
|
Any
|
Additional keyword arguments to pass to func. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of results grouped by original sublist lengths. |
validate_plugin_name(name)
¶
Check whether name is a valid plugin name.
A valid plugin name starts with a lowercase letter and contains only lowercase letters, digits, and underscores. This guards against path traversal and code-injection when the name is interpolated into filesystem paths and Python source templates.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The candidate plugin name. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
autorag_research.exceptions
¶
DuplicateRetrievalGTError
¶
Bases: Exception
Raised when retrieval GT already exists for a query and upsert is False.
EmbeddingError
¶
Bases: Exception
Raised when the embedding model is not set.
EmptyIterableError
¶
Bases: Exception
Raised when an iterable is empty but should contain items.
EnvNotFoundError
¶
Bases: Exception
Raised when a required environment variable is not found.
EvaluationError
¶
ExecutorError
¶
Bases: Exception
Base exception for Executor errors.
HealthCheckError
¶
InvalidDatasetNameError
¶
Bases: NameError
Raised when an invalid dataset name is provided.
LLMError
¶
Bases: Exception
Raised when the LLM model is not set.
LengthMismatchError
¶
Bases: Exception
Raised when there is a length mismatch between two related lists.
LogprobsNotSupportedError
¶
MaxRetriesExceededError
¶
MissingDBNameError
¶
Bases: Exception
Raised when the database name is missing in the configuration.
MissingRequiredParameterError
¶
Bases: Exception
Raised when required parameters are missing.
NoQueryInDBError
¶
Bases: Exception
Raised when there are no queries in the database.
NoSessionError
¶
Bases: Exception
Raised when there is no active database session.
PipelineExecutionError
¶
PipelineVerificationError
¶
RepositoryNotSupportedError
¶
Bases: Exception
Raised when a repository is not supported by the current UoW.
RerankerError
¶
Bases: Exception
Raised when the reranker model fails.
SchemaNotFoundError
¶
Bases: Exception
Raised when a schema is not found.
ServiceNotSetError
¶
Bases: Exception
Raised when the service is not set.
SessionNotSetError
¶
Bases: Exception
Raised when the database session is not set.
UnsupportedDataSubsetError
¶
Bases: Exception
Raised when an unsupported data subset is requested.
UnsupportedLanguageError
¶
Bases: Exception
Raised when an unsupported language is specified.
UnsupportedMTEBTaskTypeError
¶
Bases: Exception
Raised when an unsupported MTEB task type is provided.