Skip to content

API Reference

Auto-generated from source code.

Configuration

autorag_research.config.ExecutorConfig dataclass

Configuration for the Executor.

Attributes:

Name Type Description
pipelines list[BasePipelineConfig]

List of pipeline configurations to run.

metrics list[BaseMetricConfig]

List of metric configurations to evaluate.

max_retries int

Maximum number of retry attempts for failed pipelines.

eval_batch_size int

Batch size for metric evaluation.

health_check_queries int

Number of queries to run during health check before full execution. Set to 0 to disable. Defaults to 2.

Example
config = ExecutorConfig(
    pipelines=[
        BM25PipelineConfig(name="bm25_v1", tokenizer="bert"),
    ],
    metrics=[
        RecallConfig(),
        NDCGConfig(),
    ],
    max_retries=3,
)

autorag_research.config.BasePipelineConfig dataclass

Bases: ABC

Base configuration for all pipelines.

Subclasses should define their specific configuration parameters as dataclass fields and implement the abstract methods.

Attributes:

Name Type Description
name str

Unique name for this pipeline instance.

description str

Optional description of the pipeline.

pipeline_type PipelineType

Type of pipeline (RETRIEVAL or GENERATION).

top_k int

Number of results to retrieve per query. Default: 10.

batch_size int

Number of queries to fetch from DB at once. Default: 128.

max_concurrency int

Maximum concurrent async operations (semaphore limit). Default: 16.

max_retries int

Maximum retry attempts for failed queries (uses tenacity). Default: 3.

retry_delay float

Base delay in seconds for exponential backoff between retries. Default: 1.0.

Example
@dataclass
class BM25PipelineConfig(BasePipelineConfig):
    tokenizer: str = "bert"
    index_name: str = "idx_chunk_bm25"
    pipeline_type: PipelineType = field(default=PipelineType.RETRIEVAL, init=False)

    def get_pipeline_class(self) -> Type:
        from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline
        return BM25RetrievalPipeline

    def get_pipeline_kwargs(self) -> dict[str, Any]:
        return {"tokenizer": self.tokenizer, "index_name": self.index_name}

get_pipeline_class() abstractmethod

Return the pipeline class to instantiate.

Returns:

Type Description
type[BaseRetrievalPipeline]

The pipeline class type.

get_pipeline_kwargs() abstractmethod

Return kwargs for pipeline constructor.

These kwargs are passed to the pipeline constructor along with session_factory, name, and schema (which are handled by Executor).

Returns:

Type Description
dict[str, Any]

Dictionary of keyword arguments for the pipeline constructor.

get_run_kwargs() abstractmethod

Return kwargs for pipeline.run() method.

Returns:

Type Description
dict[str, Any]

Dictionary of keyword arguments for the run method.

autorag_research.config.BaseMetricConfig dataclass

Bases: ABC

Base configuration for all metrics.

Subclasses should define their specific configuration parameters as dataclass fields and implement the abstract methods.

Attributes:

Name Type Description
metric_type MetricType

Type of metric (RETRIEVAL or GENERATION).

Example
@dataclass
class RecallConfig(BaseMetricConfig):
    metric_type: MetricType = field(default=MetricType.RETRIEVAL, init=False)

    def get_metric_name(self) -> str:
        return "retrieval_recall"

    def get_metric_func(self) -> Callable:
        from autorag_research.evaluation.metrics import retrieval_recall
        return retrieval_recall

get_compute_granularity()

Return metric compute granularity.

Query-level metrics return one score per query and are computed batch-by-batch. Dataset-level metrics require all samples together to produce paper-aligned scores.

Returns:

Type Description
Literal['query', 'dataset']

"query" for per-query metrics (default), "dataset" for whole-dataset metrics.

get_metric_func() abstractmethod

Return the metric function.

Returns:

Type Description
Callable

The callable metric function.

get_metric_kwargs()

Return optional kwargs for the metric function.

Override this method if the metric function accepts additional arguments.

Returns:

Type Description
dict[str, Any]

Dictionary of keyword arguments for the metric function.

get_metric_name()

Return the metric name for database storage.

Returns:

Type Description
str

The metric name string.

Executor

autorag_research.executor.Executor

Orchestrates pipeline execution and metric evaluation.

The Executor coordinates: 1. Sequential execution of configured pipelines 2. Verification that all queries have results 3. Retry logic for failed pipelines 4. Metric evaluation for each pipeline (immediately after pipeline completes)

Metric Evaluation Rules: - Retrieval pipelines: Only retrieval metrics are evaluated - Generation pipelines: Both retrieval AND generation metrics are evaluated

Example
from autorag_research.config import ExecutorConfig
from autorag_research.executor import Executor
from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.retrieval.bm25 import BM25PipelineConfig
from autorag_research.evaluation.metrics.retrieval import RecallConfig, NDCGConfig

db = DBConnection.from_config()  # or DBConnection.from_env()
session_factory = db.get_session_factory()

config = ExecutorConfig(
    pipelines=[
        BM25PipelineConfig(
            name="bm25_baseline",
            tokenizer="bert",
            top_k=10,
        ),
    ],
    metrics=[
        RecallConfig(),
        NDCGConfig(),
    ],
    max_retries=3,
)

executor = Executor(session_factory, config)
result = executor.run()

__init__(session_factory, config, schema=None, config_dir=None)

Initialize Executor.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
config ExecutorConfig

Executor configuration.

required
schema Any | None

Schema namespace from create_schema(). If None, uses default schema.

None
config_dir Path | None

Directory containing pipeline YAML configs. If None, attempts to use Hydra's config path if initialized, otherwise falls back to CWD/configs.

None

run()

Run all configured pipelines and evaluate metrics.

For each pipeline: 1. Resolve retrieval-pipeline dependencies 2. Run health check (if enabled) 3. Run the pipeline with retry logic 4. Verify completion 5. Evaluate applicable metrics (before moving to next pipeline)

Returns:

Type Description
ExecutorResult

ExecutorResult with comprehensive execution statistics.

Pipelines

Retrieval

autorag_research.pipelines.retrieval.base.BaseRetrievalPipeline

Bases: BasePipeline, ABC

Abstract base class for all retrieval pipelines.

This class provides common functionality for retrieval pipelines: - Service initialization - Pipeline creation in database - Abstract retrieve methods for subclasses to implement

Subclasses must implement: - _retrieve_by_id(): Async method for retrieval using query ID (query exists in DB) - _retrieve_by_text(): Async method for retrieval using raw query text - _get_pipeline_config(): Return the pipeline configuration dict

__init__(session_factory, name, schema=None)

Initialize retrieval pipeline.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
name str

Name for this pipeline.

required
schema Any | None

Schema namespace from create_schema(). If None, uses default schema.

None

retrieve(query_text, top_k=10) async

Retrieve chunks for a single query (async).

This method provides single-query retrieval, designed for use within GenerationPipeline where queries are processed one at a time.

Checks if query exists in DB: - If exists: uses _retrieve_by_id() (faster, uses stored embedding) - If not: uses _retrieve_by_text() (may trigger embedding computation)

Parameters:

Name Type Description Default
query_text str

The query text to retrieve for.

required
top_k int

Number of chunks to retrieve.

10

Returns:

Type Description
list[dict[str, Any]]

List of dicts with 'doc_id' (chunk ID) and 'score' keys.

run(top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)

Run the retrieval pipeline.

Parameters:

Name Type Description Default
top_k int

Number of top documents to retrieve per query.

10
batch_size int

Number of queries to fetch from DB at once.

128
max_concurrency int

Maximum number of concurrent async operations.

16
max_retries int

Maximum number of retry attempts for failed queries.

3
retry_delay float

Base delay in seconds for exponential backoff between retries.

1.0
query_limit int | None

Maximum number of queries to process. None means no limit.

None

Returns:

Type Description
dict[str, Any]

Dictionary with pipeline execution statistics:

dict[str, Any]
  • pipeline_id: The pipeline ID
dict[str, Any]
  • total_queries: Number of queries processed successfully
dict[str, Any]
  • total_results: Number of results stored
dict[str, Any]
  • failed_queries: List of query IDs that failed after all retries

autorag_research.pipelines.retrieval.bm25.BM25RetrievalPipeline

Bases: BaseRetrievalPipeline

Pipeline for running VectorChord-BM25 retrieval.

This pipeline wraps RetrievalPipelineService with BM25DBModule, providing a convenient interface for BM25-based retrieval using PostgreSQL's VectorChord-BM25 extension.

BM25 does not require embeddings, so both _retrieve_by_id() and _retrieve_by_text() work without any embedding model.

Example
from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline

db = DBConnection.from_config()  # or DBConnection.from_env()
session_factory = db.get_session_factory()

# Initialize pipeline
pipeline = BM25RetrievalPipeline(
    session_factory=session_factory,
    name="bm25_baseline",
    tokenizer="bert",
)

# Run pipeline on all queries in DB
results = pipeline.run(top_k=10)

# Or retrieve for a single query
chunks = await pipeline.retrieve("What is machine learning?", top_k=10)

__init__(session_factory, name, tokenizer='bert', index_name=DEFAULT_BM25_INDEX_NAME, schema=None)

Initialize BM25 retrieval pipeline.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
name str

Name for this pipeline.

required
tokenizer str

Tokenizer name for BM25 (default: "bert" for bert_base_uncased). Available tokenizers (pg_tokenizer pre-built models): - "bert": bert-base-uncased (Hugging Face) - Default - "wiki_tocken": Wikitext-103 trained model - "gemma2b": Google lightweight model (~100MB memory) - "llmlingua2": Microsoft summarization model (~200MB memory) See: https://github.com/tensorchord/pg_tokenizer.rs/blob/main/docs/06-model.md

'bert'
index_name str

Name of the BM25 index (default: "idx_chunk_bm25").

DEFAULT_BM25_INDEX_NAME
schema Any | None

Schema namespace from create_schema(). If None, uses default schema.

None

Generation

autorag_research.pipelines.generation.base.BaseGenerationPipeline

Bases: BasePipeline, ABC

Abstract base class for all generation pipelines.

This class provides common functionality for generation pipelines: - Composition with a retrieval pipeline for flexible retrieval strategies - Service initialization for database operations - Pipeline creation in database - Abstract generate method for subclasses to implement

Subclasses must implement: - _generate(): Async generate method given a query ID (has access to a retrieval pipeline) - _get_pipeline_config(): Return the pipeline configuration dict

Example
class BasicRAGPipeline(BaseGenerationPipeline):
    async def _generate(self, query_id: int | str, top_k: int) -> GenerationResult:
        # Retrieve relevant chunks by query_id (async)
        results = await self._retrieval_pipeline._retrieve_by_id(query_id, top_k)
        chunk_ids = [r["doc_id"] for r in results]
        chunk_contents = self._service.get_chunk_contents(chunk_ids)

        # Retrieve relevant chunks (async)
        results = await self._retrieval_pipeline.retrieve(query_text, top_k)
        chunks = [self.get_chunk_content(r["doc_id"]) for r in results]
        # Get query text (uses query_to_llm if available, else contents)
        query_text = self._get_query_text(query_id)

        # Build prompt and generate (async)
        context = "\n\n".join(chunk_contents)
        prompt = f"Context:\n{context}\n\nQuestion: {query_text}\n\nAnswer:"
        response = await self._llm.ainvoke(prompt)

        return GenerationResult(text=str(response.content))

__init__(session_factory, name, llm, retrieval_pipeline, schema=None)

Initialize generation pipeline.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
name str

Name for this pipeline.

required
llm BaseLanguageModel

LangChain BaseLanguageModel instance for text generation.

required
retrieval_pipeline BaseRetrievalPipeline

Retrieval pipeline instance for fetching relevant context.

required
schema Any | None

Schema namespace from create_schema(). If None, uses default schema.

None

run(top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)

Run the generation pipeline.

Parameters:

Name Type Description Default
top_k int

Number of top documents to retrieve per query.

10
batch_size int

Number of queries to fetch from DB at once.

128
max_concurrency int

Maximum number of concurrent async operations.

16
max_retries int

Maximum number of retry attempts for failed queries.

3
retry_delay float

Base delay in seconds for exponential backoff between retries.

1.0
query_limit int | None

Maximum number of queries to process. None means no limit.

None

Returns:

Type Description
dict[str, Any]

Dictionary with pipeline execution statistics:

dict[str, Any]
  • pipeline_id: The pipeline ID
dict[str, Any]
  • total_queries: Number of queries processed successfully
dict[str, Any]
  • total_tokens: Total tokens used (if available)
dict[str, Any]
  • avg_execution_time_ms: Average execution time per query
dict[str, Any]
  • failed_queries: List of query IDs that failed after all retries

autorag_research.pipelines.generation.basic_rag.BasicRAGPipeline

Bases: BaseGenerationPipeline

Simple single-call RAG pipeline: retrieve once -> build prompt -> generate once.

This pipeline implements the most basic RAG pattern: 1. Take a query 2. Retrieve relevant chunks using the composed retrieval pipeline 3. Build a prompt with retrieved context 4. Call LLM once to generate the answer

The retrieval pipeline can be any BaseRetrievalPipeline implementation (BM25, vector search, hybrid, HyDE, etc.), providing flexibility in the retrieval strategy while keeping the generation simple.

Example
from langchain_openai import ChatOpenAI

from autorag_research.orm.connection import DBConnection
from autorag_research.pipelines.generation.basic_rag import BasicRAGPipeline
from autorag_research.pipelines.retrieval.bm25 import BM25RetrievalPipeline

db = DBConnection.from_config()  # or DBConnection.from_env()
session_factory = db.get_session_factory()

# Create retrieval pipeline
retrieval_pipeline = BM25RetrievalPipeline(
    session_factory=session_factory,
    name="bm25_baseline",
    tokenizer="bert",
)

# Create generation pipeline
pipeline = BasicRAGPipeline(
    session_factory=session_factory,
    name="basic_rag_v1",
    llm=ChatOpenAI(model="gpt-4"),
    retrieval_pipeline=retrieval_pipeline,
)

# Run pipeline
results = pipeline.run(top_k=5)

__init__(session_factory, name, llm, retrieval_pipeline, prompt_template=DEFAULT_PROMPT_TEMPLATE, schema=None)

Initialize Basic RAG pipeline.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
name str

Name for this pipeline.

required
llm BaseLanguageModel

LangChain BaseLanguageModel instance for text generation.

required
retrieval_pipeline BaseRetrievalPipeline

Retrieval pipeline for fetching relevant context.

required
prompt_template str

Template string with {context} and {query} placeholders.

DEFAULT_PROMPT_TEMPLATE
schema Any | None

Schema namespace from create_schema(). If None, uses default schema.

None

Metrics

Retrieval

autorag_research.evaluation.metrics.retrieval

F1Config dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval F1 metric.

get_metric_func()

Return the metric function.

FullRecallConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval full recall metric.

get_metric_func()

Return the metric function.

MAPConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval MAP metric.

get_metric_func()

Return the metric function.

MRRConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval MRR metric.

get_metric_func()

Return the metric function.

NDCGConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval NDCG metric.

get_metric_func()

Return the metric function.

PrecisionConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval precision metric.

get_metric_func()

Return the metric function.

RecallConfig dataclass

Bases: BaseRetrievalMetricConfig

Configuration for retrieval recall metric.

get_metric_func()

Return the metric function.

retrieval_f1(metric_input)

Compute f1 score for retrieval.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

The f1 score.

retrieval_full_recall(metric_input)

Compute full recall (binary) for retrieval.

Returns 1.0 if ALL ground truth groups are satisfied (at least one item from each OR-group is retrieved), 0.0 otherwise.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

1.0 if all GT groups are satisfied, 0.0 otherwise.

retrieval_map(metric_input)

Compute MAP (Mean Average Precision) score for retrieval.

Mean Average Precision (MAP) is the mean of Average Precision (AP) for all queries.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

The MAP score.

retrieval_mrr(metric_input)

Compute MRR (Mean Reciprocal Rank) score for retrieval.

Reciprocal Rank (RR) is the reciprocal of the rank of the first relevant item. Mean of RR in whole queries is MRR.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

The MRR score.

retrieval_ndcg(metric_input)

Compute NDCG for multi-hop retrieval with AND-OR group semantics.

Ground truth structure: [[A, B], [C]] means (A OR B) AND C - Each inner list is an OR group (any item satisfies the group) - Outer list is AND (all groups must be satisfied for complete retrieval)

A retrieved item contributes to DCG only when it's the FIRST to satisfy a previously unsatisfied group. Subsequent items from the same group don't add value (they're redundant for answering the query).

Supports graded relevance when metric_input.relevance_scores is provided. Falls back to binary relevance (score=1) when relevance_scores is None.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric. - retrieval_gt: 2D list of ground truth IDs (AND/OR structure) - retrieved_ids: list of retrieved IDs - relevance_scores: optional dict mapping doc_id -> graded relevance score (e.g., 0=not relevant, 1=somewhat relevant, 2=highly relevant)

required

Returns:

Type Description
float

The NDCG score.

Examples:

GT: [[A, B], [C]] (need A-or-B AND C) Retrieved: [A, C] -> Perfect (both groups satisfied at top positions) Retrieved: [A, B] -> Partial (group 1 not satisfied, B is redundant) Retrieved: [C, A] -> Good but suboptimal ordering

retrieval_precision(metric_input)

Compute precision score for retrieval.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

The precision score.

retrieval_recall(metric_input)

Compute recall score for retrieval.

Parameters:

Name Type Description Default
metric_input MetricInput

The MetricInput schema for AutoRAG metric.

required

Returns:

Type Description
float

The recall score.

Generation

autorag_research.evaluation.metrics.generation

BertScoreConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for BERTScore metric.

Attributes:

Name Type Description
lang str

Language code for the text.

batch int

Batch size for processing.

n_threads int | None

Number of threads to use.

__post_init__()

Set default n_threads if not provided.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

BleuConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for BLEU metric.

Attributes:

Name Type Description
tokenize str | None

The tokenizer to use. If None, defaults to language-specific tokenizers.

smooth_method str

The smoothing method ('floor', 'add-k', 'exp' or 'none').

smooth_value float | None

The smoothing value for 'floor' and 'add-k' methods.

max_ngram_order int

Maximum n-gram order when computing precisions.

effective_order bool

Stop including n-gram orders for which precision is 0.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

MeteorConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for METEOR metric.

Attributes:

Name Type Description
alpha float

Parameter for controlling relative weights of precision and recall.

beta float

Parameter for controlling shape of penalty as a function of fragmentation.

gamma float

Relative weight assigned to fragmentation penalty.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

ResponseRelevancyConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for RAGAS-style response relevancy metric.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

RougeConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for ROUGE metric.

Attributes:

Name Type Description
rouge_type str

Rouge type to use ('rouge1', 'rouge2', 'rougeL', 'rougeLSum').

use_stemmer bool

Whether to use Porter stemmer for word suffix stripping.

split_summaries bool

Whether to add newlines between sentences for rougeLsum.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

get_metric_name()

Return the metric name.

SemScoreConfig dataclass

Bases: BaseGenerationMetricConfig

Configuration for SemScore (semantic similarity) metric.

Attributes:

Name Type Description
embedding_model Embeddings | str

Embedding model config name (e.g., "openai-large") or Embeddings instance.

truncate_length int

Maximum length of texts to embed.

get_metric_func()

Return the metric function.

get_metric_kwargs()

Return kwargs for the metric function.

bert_score(metric_inputs, lang='en', batch=128, n_threads=os.cpu_count())

Compute BERTScore metric for generation.

Parameters:

Name Type Description Default
metric_inputs list[MetricInput]

A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").

required
lang str

Language code for the text. Default is "en".

'en'
batch int

Batch size for processing. Default is 128.

128
n_threads int | None

Number of threads to use. Default is the number of CPU cores.

cpu_count()

Returns:

Type Description
list[float]

A list of BERTScore F1 scores.

bleu(metric_inputs, tokenize=None, smooth_method='exp', smooth_value=None, max_ngram_order=4, trg_lang='', effective_order=True, **kwargs)

Computes the BLEU metric given pred and ground-truth.

Parameters:

Name Type Description Default
metric_inputs list[MetricInput]

A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").

required
tokenize str | None

The tokenizer to use. If None, defaults to language-specific tokenizers with '13a' as the fallback default. check https://github.com/mjpost/sacrebleu/blob/master/sacrebleu/metrics/bleu.py

None
smooth_method str

The smoothing method to use ('floor', 'add-k', 'exp' or 'none').

'exp'
smooth_value float | None

The smoothing value for floor and add-k methods. None falls back to default value.

None
max_ngram_order int

If given, it overrides the maximum n-gram order (default: 4) when computing precisions.

4
trg_lang str

An optional language code to raise potential tokenizer warnings.

''
effective_order bool

If True, stop including n-gram orders for which precision is 0. This should be True, if sentence-level BLEU will be computed.

True
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
list[float]

A list of BLEU scores.

huggingface_evaluate(instance, key, metric_inputs, **kwargs)

Compute huggingface evaluate metric.

Parameters:

Name Type Description Default
instance Any

The instance of huggingface evaluates metric.

required
key str

The key to retrieve result score from huggingface evaluate result.

required
metric_inputs list[MetricInput]

A list of MetricInput schema.

required
**kwargs Any

The additional arguments for metric function.

{}

Returns:

Type Description
list[float]

The list of scores.

meteor(metric_inputs, alpha=0.9, beta=3.0, gamma=0.5)

Compute meteor score for generation.

Parameters:

Name Type Description Default
metric_inputs list[MetricInput]

A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").

required
alpha float

Parameter for controlling relative weights of precision and recall. Default is 0.9.

0.9
beta float

Parameter for controlling shape of penalty as a function of as a function of fragmentation. Default is 3.0.

3.0
gamma float

Relative weight assigned to fragmentation penalty. Default is 0.5.

0.5

Returns:

Type Description
list[float]

A list of computed metric scores.

response_relevancy(metric_inputs, llm, embedding_model, strictness=3, prompt_template=DEFAULT_RESPONSE_RELEVANCY_PROMPT)

RAGAS-style response relevancy metric without ragas dependency.

rouge(metric_inputs, rouge_type='rougeL', use_stemmer=False, split_summaries=False)

Compute rouge score for generation.

Args:
    metric_inputs: A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").
    rouge_type: A rouge type to use for evaluation. Default is 'RougeL'.
        Choose between rouge1, rouge2, rougeL, and rougeLSum.
        - rouge1: unigram (1-gram) based scoring.
        - rouge2: bigram (2-gram) based scoring.
        - rougeL: Longest Common Subsequence based scoring.
        - rougeLSum: splits text using "

" use_stemmer: Bool indicating whether Porter stemmer should be used to strip word suffixes to improve matching. This arg is used in the DefaultTokenizer, but other tokenizers might or might not choose to use this. Default is False. split_summaries: Whether to add newlines between sentences for rougeLsum. Default is False.

Returns:
    A list of computed metric scores.

sem_score(metric_inputs, embedding_model, truncate_length=4096)

Compute sem score between generation gt and pred with cosine similarity.

Parameters:

Name Type Description Default
metric_inputs list[MetricInput]

A list of MetricInput schema (Required Field -> "generation_gt", "generated_texts").

required
embedding_model Embeddings | str

Embedding model to use for compute cosine similarity. Can be an Embeddings instance or a string config name (e.g., "openai-large").

required
truncate_length int

Maximum length of texts to embedding. Default is 4096.

4096

Returns:

Type Description
list[float]

A list of computed metric scores.

Data Ingestion

autorag_research.data.base.DataIngestor

Bases: ABC

detect_primary_key_type() abstractmethod

Detect the primary key type used in the dataset.

ingest(subset='test', query_limit=None, min_corpus_cnt=None) abstractmethod

Ingest data from the specified source. This process does not include an embedding process.

Parameters:

Name Type Description Default
subset Literal['train', 'dev', 'test']

Dataset split to ingest (train, dev, or test).

'test'
query_limit int | None

Maximum number of queries to ingest. None means no limit.

None
min_corpus_cnt int | None

Maximum number of corpus items to ingest. When set, gold IDs from selected queries are always included, plus random samples to reach the limit. None means no limit.

None

autorag_research.data.base.TextEmbeddingDataIngestor

Bases: DataIngestor, ABC

embed_all(max_concurrency=16, batch_size=128)

Embed all queries and text chunks.

autorag_research.data.base.MultiModalEmbeddingDataIngestor

Bases: DataIngestor, ABC

embed_all(max_concurrency=16, batch_size=128)

Embed all queries and image chunks using single-vector embedding model.

embed_all_late_interaction(max_concurrency=16, batch_size=128)

Embed all queries and image chunks using multi-vector embedding model.

ORM

Repository

autorag_research.orm.repository

Repository module for AutoRAG-Research ORM.

This module provides repository classes for data access layer operations.

BaseVectorRepository

Bases: GenericRepository[T]

Base repository with vector search capabilities.

Extends GenericRepository with vector search methods for use with pgvector and VectorChord for efficient similarity search.

Perform MaxSim search using VectorChord's @# operator.

MaxSim computes late interaction similarity: for each query vector, find the closest document vector, compute dot product, and sum results.

Parameters:

Name Type Description Default
query_vectors list[list[float]]

List of query embedding vectors (multi-vector query).

required
vector_column str

Name of the multi-vector column to search.

'embeddings'
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[tuple[T, float]]

List of tuples (entity, distance_score) ordered by similarity.

list[tuple[T, float]]

Lower distance scores indicate higher similarity.

list[tuple[T, float]]

The distance score calculated by (1 - maxsim_score), thus the range is [-infinity, 0].

list[tuple[T, float]]

You might normalize this score by dividing by the number of query vectors.

Note

Requires VectorChord extension and vchordrq index with vector_maxsim_ops. Example index: CREATE INDEX ON table USING vchordrq (embeddings vector_maxsim_ops);

maxsim_search_with_ids(query_vectors, vector_column='embeddings', id_column='id', limit=10)

Perform MaxSim search and return only IDs with scores.

This is more efficient when you only need IDs and scores.

Parameters:

Name Type Description Default
query_vectors list[list[float]]

List of query embedding vectors (multi-vector query).

required
vector_column str

Name of the multi-vector column to search.

'embeddings'
id_column str

Name of the primary key column.

'id'
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[tuple[int | str, float]]

List of tuples (entity_id, distance_score) ordered by similarity.

set_multi_vector_embedding(entity_id, embeddings, vector_column='embeddings', id_column='id')

Set multi-vector embedding for an entity using raw SQL.

This method bypasses SQLAlchemy's type processing to properly format vector arrays for VectorChord compatibility.

Parameters:

Name Type Description Default
entity_id int | str

The entity's primary key.

required
embeddings list[list[float]]

List of embedding vectors (list of list of floats).

required
vector_column str

Name of the multi-vector column (default: "embeddings").

'embeddings'
id_column str

Name of the primary key column (default: "id").

'id'

Returns:

Type Description
bool

True if update was successful, False otherwise.

set_multi_vector_embeddings_batch(entity_ids, embeddings_list, vector_column='embeddings', id_column='id')

Batch set multi-vector embeddings for multiple entities.

Parameters:

Name Type Description Default
entity_ids list[int | str]

List of entity primary keys.

required
embeddings_list list[list[list[float]]]

List of multi-vector embeddings (one per entity).

required
vector_column str

Name of the multi-vector column (default: "embeddings").

'embeddings'
id_column str

Name of the primary key column (default: "id").

'id'

Returns:

Type Description
int

Number of entities successfully updated.

Perform vector similarity search using VectorChord's cosine distance.

Uses raw SQL with VectorChord's <=> operator for cosine distance. This approach avoids SQLAlchemy type processing issues with Vector objects.

Parameters:

Name Type Description Default
query_vector list[float]

The query embedding vector as a plain Python list of floats.

required
vector_column str

Name of the vector column to search.

'embedding'
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[T]

List of entities ordered by similarity (most similar first).

Note

Requires VectorChord extension and vchordrq index on the embedding column. Example index: CREATE INDEX ON table USING vchordrq (embedding vector_cosine_ops);

vector_search_with_scores(query_vector, vector_column='embedding', limit=10)

Perform vector similarity search using VectorChord's cosine distance.

Uses raw SQL with VectorChord's <=> operator for cosine distance. This approach avoids SQLAlchemy type processing issues with Vector objects.

Parameters:

Name Type Description Default
query_vector list[float]

The query embedding vector as a plain Python list of floats.

required
vector_column str

Name of the vector column to search.

'embedding'
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[tuple[T, float]]

List of tuples (entity, distance_score) ordered by similarity.

list[tuple[T, float]]

Lower distance scores indicate higher similarity.

list[tuple[T, float]]

The score is the cosine distance, which is calculated as (1 - cosine_similarity).

list[tuple[T, float]]

0 means identical, 2 means opposite, and 1 means orthogonal.

Note

Requires VectorChord extension and vchordrq index on the embedding column. Example index: CREATE INDEX ON table USING vchordrq (embedding vector_cosine_ops);

ChunkRepository

Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]

Repository for Chunk entity with vector search capabilities.

__init__(session, model_cls=None)

Initialize chunk repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Chunk model class to use. If None, uses default schema.

None

Perform VectorChord BM25 search.

Uses VectorChord-BM25's <&> operator for full-text search. Returns entities with their BM25 scores (converted to positive values).

Parameters:

Name Type Description Default
query_text str

The query text to search for.

required
index_name str

Name of the BM25 index (default: "idx_chunk_bm25").

'idx_chunk_bm25'
limit int

Maximum number of results to return.

10
tokenizer str

Tokenizer to use for query (default: "bert"). Available tokenizers (pg_tokenizer pre-built models): - "bert": bert-base-uncased (Hugging Face) - Default - "wiki_tocken": Wikitext-103 trained model - "gemma2b": Google lightweight model (~100MB memory) - "llmlingua2": Microsoft summarization model (~200MB memory) See: https://github.com/tensorchord/pg_tokenizer.rs/blob/main/docs/06-model.md

'bert'

Returns:

Type Description
list[tuple[Any, float]]

List of tuples (entity, score) ordered by relevance.

list[tuple[Any, float]]

Higher scores indicate higher relevance.

Note

BM25 scores from VectorChord are negative (more negative = more relevant). This method negates the scores so higher = more relevant.

get_by_contents_exact(contents)

Retrieve chunks with exact contents match.

Parameters:

Name Type Description Default
contents str

The exact contents to search for.

required

Returns:

Type Description
list[Any]

List of chunks with matching contents.

get_by_table_type(table_type)

Retrieve chunks with a specific table_type.

Parameters:

Name Type Description Default
table_type str

The table format type (e.g., 'markdown', 'xml', 'html').

required

Returns:

Type Description
list[Any]

List of chunks with the specified table_type.

get_chunks_with_empty_content(limit=None)

Retrieve chunks that have empty or whitespace-only contents.

Parameters:

Name Type Description Default
limit int | None

Maximum number of results to return.

None

Returns:

Type Description
list[Any]

List of chunks with empty content.

get_non_table_chunks()

Retrieve chunks that are not tables (is_table=False).

Returns:

Type Description
list[Any]

List of chunks where is_table is False.

get_table_chunks()

Retrieve chunks that are tables (is_table=True).

Returns:

Type Description
list[Any]

List of chunks where is_table is True.

get_with_all_relations(chunk_id)

Retrieve a chunk with all relationships eagerly loaded.

Parameters:

Name Type Description Default
chunk_id int | str

The chunk ID.

required

Returns:

Type Description
Any | None

The chunk with all relations loaded, None if not found.

get_with_chunk_retrieved_results(chunk_id)

Retrieve a chunk with its chunk retrieved results eagerly loaded.

Parameters:

Name Type Description Default
chunk_id int | str

The chunk ID.

required

Returns:

Type Description
Any | None

The chunk with chunk retrieved results loaded, None if not found.

get_with_page_chunk_relations(chunk_id)

Retrieve a chunk with its page-chunk relations eagerly loaded.

Parameters:

Name Type Description Default
chunk_id int | str

The chunk ID.

required

Returns:

Type Description
Any | None

The chunk with page-chunk relations loaded, None if not found.

get_with_retrieval_relations(chunk_id)

Retrieve a chunk with its retrieval relations eagerly loaded.

Parameters:

Name Type Description Default
chunk_id int | str

The chunk ID.

required

Returns:

Type Description
Any | None

The chunk with retrieval relations loaded, None if not found.

search_by_contents(search_text)

Search chunks by contents using SQL LIKE.

Parameters:

Name Type Description Default
search_text str

The text to search for (use % as wildcard).

required

Returns:

Type Description
list[Any]

List of matching chunks.

ChunkRetrievedResultRepository

Bases: BaseRetrievedResultRepository

Repository for ChunkRetrievedResult entity.

__init__(session, model_cls=None)

Initialize chunk retrieved result repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The ChunkRetrievedResult model class to use. If None, uses default schema.

None

DocumentRepository

Bases: GenericRepository

Repository for Document entity with specialized queries.

__init__(session, model_cls=None)

Initialize document repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Document model class. If None, uses default schema.

None

count_pages(document_id)

Count the number of pages in a document.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required

Returns:

Type Description
int

Number of pages in the document.

get_all_with_pages(limit=None, offset=None)

Retrieve all documents with their pages eagerly loaded.

Parameters:

Name Type Description Default
limit int | None

Maximum number of results to return.

None
offset int | None

Number of results to skip.

None

Returns:

Type Description
list[Any]

List of documents with pages loaded.

get_by_author(author)

Retrieve all documents by a specific author.

Parameters:

Name Type Description Default
author str

The author name to search for.

required

Returns:

Type Description
list[Any]

List of documents by the author.

get_by_filename(filename)

Retrieve a document by its filename.

Parameters:

Name Type Description Default
filename str

The filename to search for.

required

Returns:

Type Description
Any | None

The document if found, None otherwise.

get_by_path_id(path_id)

Retrieve a document by its file path ID.

Parameters:

Name Type Description Default
path_id int | str

The file ID to search for.

required

Returns:

Type Description
Any | None

The document if found, None otherwise.

get_by_title(title)

Retrieve a document by its title.

Parameters:

Name Type Description Default
title str

The title to search for.

required

Returns:

Type Description
Any | None

The document if found, None otherwise.

get_with_file(document_id)

Retrieve a document with its file eagerly loaded.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required

Returns:

Type Description
Any | None

The document with file loaded, None if not found.

get_with_pages(document_id)

Retrieve a document with its pages eagerly loaded.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required

Returns:

Type Description
Any | None

The document with pages loaded, None if not found.

search_by_metadata(metadata_key, metadata_value)

Search documents by metadata field.

Parameters:

Name Type Description Default
metadata_key str

The key in the JSONB metadata field.

required
metadata_value str

The value to search for.

required

Returns:

Type Description
list[Any]

List of matching documents.

ExecutorResultRepository

Bases: GenericRepository[Any]

Repository for ExecutorResult entity with composite key support.

__init__(session, model_cls=None)

Initialize executor result repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The ExecutorResult model class to use. If None, uses default schema.

None

delete_by_composite_key(query_id, pipeline_id)

Delete an executor result by its composite primary key.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
bool

True if the result was deleted, False if not found.

delete_by_pipeline(pipeline_id)

Delete all executor results for a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
int

Number of deleted records.

exists_by_composite_key(query_id, pipeline_id)

Check if an executor result exists with the given composite key.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
bool

True if the result exists, False otherwise.

get_by_composite_key(query_id, pipeline_id)

Retrieve an executor result by its composite primary key.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The executor result if found, None otherwise.

get_by_execution_time_range(pipeline_id, min_time, max_time)

Retrieve executor results within an execution time range.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required
min_time int

Minimum execution time (inclusive).

required
max_time int

Maximum execution time (inclusive).

required

Returns:

Type Description
list[Any]

List of executor results within the specified range.

get_by_pipeline_id(pipeline_id)

Retrieve all executor results for a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
list[Any]

List of executor results for the pipeline.

get_by_queries_and_pipeline(query_ids, pipeline_id)

Retrieve executor results for multiple queries under a specific pipeline.

Parameters:

Name Type Description Default
query_ids list[int | str]

List of query IDs.

required
pipeline_id int | str

The pipeline ID.

required

Returns: List of executor results matching the criteria.

get_by_query_id(query_id)

Retrieve all executor results for a specific query.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required

Returns:

Type Description
list[Any]

List of executor results for the query.

get_with_all_relations(query_id, pipeline_id)

Retrieve an executor result with all relations eagerly loaded.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The executor result with all relations loaded, None if not found.

FileRepository

Bases: GenericRepository

Repository for File entity with specialized queries.

__init__(session, model_cls=None)

Initialize file repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The File model class. If None, uses default schema.

None

count_by_type(file_type)

Count the number of files of a specific type.

Parameters:

Name Type Description Default
file_type str

The file type to count.

required

Returns:

Type Description
int

Number of files of the specified type.

get_all_by_type(file_type, limit=None, offset=None)

Retrieve all files of a specific type with pagination.

Parameters:

Name Type Description Default
file_type str

The file type to filter by.

required
limit int | None

Maximum number of results to return.

None
offset int | None

Number of results to skip.

None

Returns:

Type Description
list[Any]

List of files of the specified type.

get_all_types()

Get all unique file types in the database.

Returns:

Type Description
list[ColumnElement[Any]]

List of unique file types.

get_by_path(path)

Retrieve a file by its path.

Parameters:

Name Type Description Default
path str

The file path to search for.

required

Returns:

Type Description
Any | None

The file if found, None otherwise.

get_by_type(file_type)

Retrieve all files of a specific type.

Parameters:

Name Type Description Default
file_type str

The file type (raw, image, audio, video).

required

Returns:

Type Description
list[Any]

List of files of the specified type.

get_with_documents(file_id)

Retrieve a file with its documents eagerly loaded.

Parameters:

Name Type Description Default
file_id int | str

The file ID.

required

Returns:

Type Description
Any | None

The file with documents loaded, None if not found.

search_by_path_pattern(pattern)

Search files by path pattern using SQL LIKE.

Parameters:

Name Type Description Default
pattern str

The pattern to search for (use % as wildcard).

required

Returns:

Type Description
list[Any]

List of matching files.

GenericRepository

Bases: Generic[T]

Generic repository implementing common CRUD operations.

This base class provides reusable database operations that can be extended by specific repositories for custom business logic.

__init__(session, model_cls)

Initialize repository with a session and model class.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type[T]

The SQLAlchemy model class this repository manages.

required

add(entity)

Add a new entity to the session.

Parameters:

Name Type Description Default
entity T

The entity instance to add.

required

Returns:

Type Description
T

The added entity.

add_all(entities)

Add multiple entities to the session.

Parameters:

Name Type Description Default
entities list[T]

List of entity instances to add.

required

Returns:

Type Description
list[T]

The added entities.

add_bulk(items)

Memory-efficient bulk insert using SQLAlchemy Core.

Unlike add_all(), this method does not create ORM objects in Python memory. Instead, it uses SQLAlchemy Core's insert() which generates a single multi-row INSERT statement, significantly reducing memory usage and improving performance for large batch inserts.

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries representing records to insert.

required

Returns:

Type Description
list[Any]

List of inserted IDs.

Note

For 1000 records, this method uses ~3-5x less memory than add_all() because it bypasses ORM object creation and identity map tracking. String values are automatically sanitized to remove NUL bytes for PostgreSQL compatibility.

add_bulk_skip_duplicates(items)

Memory-efficient bulk insert that skips rows with duplicate primary keys.

Uses PostgreSQL's INSERT ... ON CONFLICT DO NOTHING to silently skip conflicting rows in a single SQL statement. This is useful when ingesting datasets that may contain duplicate primary keys (e.g., RAGBench).

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries representing records to insert.

required

Returns:

Type Description
list[Any]

List of inserted IDs (excludes skipped duplicates).

Note

String values are automatically sanitized to remove NUL bytes for PostgreSQL compatibility.

count()

Count total number of entities.

Returns:

Type Description
int

Total count of entities.

delete(entity)

Delete an entity from the database.

Parameters:

Name Type Description Default
entity T

The entity instance to delete.

required

delete_by_id(_id)

Delete an entity by its primary key.

Parameters:

Name Type Description Default
_id Any

The primary key value.

required

Returns:

Type Description
bool

True if entity was deleted, False if not found.

exists(_id)

Check if an entity exists by its primary key.

Parameters:

Name Type Description Default
_id Any

The primary key value.

required

Returns:

Type Description
bool

True if entity exists, False otherwise.

get_all(limit=None, offset=None)

Retrieve all entities of this type.

Parameters:

Name Type Description Default
limit int | None

Maximum number of results to return.

None
offset int | None

Number of results to skip.

None

Returns:

Type Description
list[T]

List of all entities.

get_by_id(_id)

Retrieve an entity by its primary key.

Parameters:

Name Type Description Default
_id Any

The primary key value.

required

Returns:

Type Description
T | None

The entity if found, None otherwise.

get_by_ids(ids)

Retrieve multiple entities by their primary keys.

Parameters:

Name Type Description Default
ids list[Any]

List of primary key values.

required

Returns:

Type Description
list[T]

List of entities found (may be fewer than requested if some don't exist).

update(entity)

Update an existing entity.

Parameters:

Name Type Description Default
entity T

The entity instance to update.

required

Returns:

Type Description
T

The updated entity.

ImageChunkRepository

Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]

Repository for ImageChunk entity with vector search capabilities.

__init__(session, model_cls=None)

Initialize image chunk repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The ImageChunk model class to use. If None, uses default schema.

None

count_by_page(page_id)

Count the number of image chunks for a specific page.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
int

Number of image chunks for the page.

get_by_page_id(page_id)

Retrieve all image chunks for a specific page.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
list[Any]

List of image chunks belonging to the page.

get_with_all_relations(image_chunk_id)

Retrieve an image chunk with all relationships eagerly loaded.

Parameters:

Name Type Description Default
image_chunk_id int | str

The image chunk ID.

required

Returns:

Type Description
Any | None

The image chunk with all relations loaded, None if not found.

get_with_image_chunk_retrieved_results(image_chunk_id)

Retrieve an image chunk with its image chunk retrieved results eagerly loaded.

Parameters:

Name Type Description Default
image_chunk_id int | str

The image chunk ID.

required

Returns:

Type Description
Any | None

The image chunk with image chunk retrieved results loaded, None if not found.

get_with_page(image_chunk_id)

Retrieve an image chunk with its page eagerly loaded.

Parameters:

Name Type Description Default
image_chunk_id int | str

The image chunk ID.

required

Returns:

Type Description
Any | None

The image chunk with page loaded, None if not found.

get_with_retrieval_relations(image_chunk_id)

Retrieve an image chunk with its retrieval relations eagerly loaded.

Parameters:

Name Type Description Default
image_chunk_id int | str

The image chunk ID.

required

Returns:

Type Description
Any | None

The image chunk with retrieval relations loaded, None if not found.

MetricRepository

Bases: GenericRepository[Any]

Repository for Metric entity with relationship loading capabilities.

__init__(session, model_cls=None)

Initialize metric repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Metric model class to use. If None, uses default schema.

None

exists_by_name(name)

Check if a metric exists with the given name.

Parameters:

Name Type Description Default
name str

The metric name to check.

required

Returns:

Type Description
bool

True if a metric exists, False otherwise.

exists_by_name_and_type(name, metric_type)

Check if a metric exists with the given name and type.

Parameters:

Name Type Description Default
name str

The metric name to check.

required
metric_type str

The metric type (retrieval or generation).

required

Returns:

Type Description
bool

True if a metric exists, False otherwise.

get_all_generation_metrics()

Retrieve all generation metrics.

Returns:

Type Description
list[Any]

List of all generation metrics ordered by name.

get_all_retrieval_metrics()

Retrieve all retrieval metrics.

Returns:

Type Description
list[Any]

List of all retrieval metrics ordered by name.

get_by_name(name)

Retrieve a metric by its name.

Parameters:

Name Type Description Default
name str

The metric name to search for.

required

Returns:

Type Description
Any | None

The metric if found, None otherwise.

get_by_name_and_type(name, metric_type)

Retrieve a metric by its name and type.

Parameters:

Name Type Description Default
name str

The metric name to search for.

required
metric_type str

The metric type (retrieval or generation).

required

Returns:

Type Description
Any | None

The metric if found, None otherwise.

get_by_type(metric_type)

Retrieve all metrics of a specific type.

Parameters:

Name Type Description Default
metric_type str

The metric type (retrieval or generation).

required

Returns:

Type Description
list[Any]

List of metrics of the specified type.

get_with_all_relations(metric_id)

Retrieve a metric with all relations eagerly loaded.

Parameters:

Name Type Description Default
metric_id int | str

The metric ID.

required

Returns:

Type Description
Any | None

The metric with all relations loaded, None if not found.

get_with_summaries(metric_id)

Retrieve a metric with its summaries eagerly loaded.

Parameters:

Name Type Description Default
metric_id int | str

The metric ID.

required

Returns:

Type Description
Any | None

The metric with summaries loaded, None if not found.

search_by_name(search_text, limit=10)

Search metrics containing the specified text in their name.

Parameters:

Name Type Description Default
search_text str

Text to search for in metric names.

required
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[Any]

List of metrics containing the search text.

PageRepository

Bases: GenericRepository

Repository for Page entity with specialized queries.

__init__(session, model_cls=None)

Initialize page repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Page model class. If None, uses default schema.

None

count_by_document(document_id)

Count the number of pages in a document.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required

Returns:

Type Description
int

Number of pages in the document.

get_all_with_document(limit=None, offset=None)

Retrieve all pages with their documents eagerly loaded.

Parameters:

Name Type Description Default
limit int | None

Maximum number of results to return.

None
offset int | None

Number of results to skip.

None

Returns:

Type Description
list[Any]

List of pages with documents loaded.

get_by_document_and_page_num(document_id, page_num)

Retrieve a specific page by document ID and page number.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required
page_num int

The page number.

required

Returns:

Type Description
Any | None

The page if found, None otherwise.

get_by_document_id(document_id)

Retrieve all pages for a specific document.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required

Returns:

Type Description
list[Any]

List of pages belonging to the document.

get_page_range(document_id, start_page, end_page)

Retrieve a range of pages from a document.

Parameters:

Name Type Description Default
document_id int | str

The document ID.

required
start_page int

The starting page number (inclusive).

required
end_page int

The ending page number (inclusive).

required

Returns:

Type Description
list[Any]

List of pages in the specified range.

get_with_chunks(page_id)

Retrieve a page with its chunks eagerly loaded.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
Any | None

The page with chunks loaded, None if not found.

get_with_document(page_id)

Retrieve a page with its document eagerly loaded.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
Any | None

The page with document loaded, None if not found.

get_with_image_chunks(page_id)

Retrieve a page with its image chunks eagerly loaded.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
Any | None

The page with image chunks loaded, None if not found.

get_with_page_chunk_relations(page_id)

Retrieve a page with its page-chunk relations eagerly loaded.

Parameters:

Name Type Description Default
page_id int | str

The page ID.

required

Returns:

Type Description
Any | None

The page with page-chunk relations loaded, None if not found.

search_by_metadata(metadata_key, metadata_value)

Search pages by metadata field.

Parameters:

Name Type Description Default
metadata_key str

The key in the JSONB metadata field.

required
metadata_value str

The value to search for.

required

Returns:

Type Description
list[Any]

List of matching pages.

PipelineRepository

Bases: GenericRepository[Any]

Repository for Pipeline entity with relationship loading capabilities.

__init__(session, model_cls=None)

Initialize pipeline repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Pipeline model class to use. If None, uses default schema.

None

exists_by_name(name)

Check if a pipeline with the given name exists.

Parameters:

Name Type Description Default
name str

The pipeline name to check.

required

Returns:

Type Description
bool

True if pipeline exists, False otherwise.

get_all_ordered_by_name()

Retrieve all pipelines ordered by name.

Returns:

Type Description
list[Any]

List of all pipelines ordered alphabetically by name.

get_by_config_key(key, value)

Retrieve pipelines with a specific config key-value pair.

Parameters:

Name Type Description Default
key str

The config key to search for.

required
value str | int | float | bool

The config value to match.

required

Returns:

Type Description
list[Any]

List of pipelines with matching config.

Note

Uses JSONB containment operator (@>) for efficient config searching.

get_by_name(name)

Retrieve a pipeline by name.

Parameters:

Name Type Description Default
name str

The pipeline name.

required

Returns:

Type Description
Any | None

The pipeline if found, None otherwise.

get_with_all_relations(pipeline_id)

Retrieve a pipeline with all relations eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The pipeline with all relations loaded, None if not found.

get_with_executor_results(pipeline_id)

Retrieve a pipeline with its executor results eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The pipeline with executor results loaded, None if not found.

get_with_retrieved_results(pipeline_id)

Retrieve a pipeline with its retrieved results eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The pipeline with chunk and image chunk retrieved results loaded, None if not found.

get_with_summaries(pipeline_id)

Retrieve a pipeline with its summaries eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID.

required

Returns:

Type Description
Any | None

The pipeline with summaries loaded, None if not found.

search_by_name(name_pattern)

Search pipelines by name pattern (case-insensitive).

Parameters:

Name Type Description Default
name_pattern str

The name pattern to search for (supports SQL LIKE wildcards).

required

Returns:

Type Description
list[Any]

List of pipelines matching the pattern.

QueryRepository

Bases: BaseVectorRepository[Any], BaseEmbeddingRepository[Any]

Repository for Query entity with relationship loading and vector search capabilities.

__init__(session, model_cls=None)

Initialize query repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required
model_cls type | None

The Query model class to use. If None, uses default schema.

None

count_all()

Count total number of queries.

Returns:

Type Description
int

Total count of queries.

count_by_generation_gt_size(size)

Count queries with a specific number of generation ground truths.

Parameters:

Name Type Description Default
size int

The number of ground truths to match.

required

Returns:

Type Description
int

Count of queries with the specified number of ground truths.

find_by_contents(contents)

Find query by exact text content match.

If multiple queries have the same content, returns the first one found.

Parameters:

Name Type Description Default
contents str

The exact query text content to find.

required

Returns:

Type Description
Any | None

The first matching query if found, None otherwise.

get_all_ids(limit, offset=0)

Get all query IDs with pagination.

Parameters:

Name Type Description Default
limit int

Maximum number of query IDs to return.

required
offset int

Number of query IDs to skip.

0

Returns:

Type Description
list[int | str]

List of query IDs ordered by ID.

get_by_query_text(query_text)

Retrieve a query by its text content.

Parameters:

Name Type Description Default
query_text str

The query text to search for.

required

Returns:

Type Description
Any | None

The query if found, None otherwise.

get_queries_with_empty_content(limit=100)

Retrieve queries with empty or whitespace-only content.

Parameters:

Name Type Description Default
limit int

Maximum number of queries to retrieve.

100

Returns:

Type Description
list[Any]

List of queries with empty content.

get_queries_with_generation_gt()

Retrieve all queries that have generation ground truth.

Returns:

Type Description
list[Any]

List of queries with generation ground truth.

get_with_all_relations(query_id)

Retrieve a query with all relations eagerly loaded.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required

Returns:

Type Description
Any | None

The query with all relations loaded, None if not found.

get_with_executor_results(query_id)

Retrieve a query with its executor results eagerly loaded.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required

Returns:

Type Description
Any | None

The query with executor results loaded, None if not found.

get_with_retrieval_relations(query_id)

Retrieve a query with its retrieval relations eagerly loaded.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required

Returns:

Type Description
Any | None

The query with retrieval relations loaded, None if not found.

search_by_query_text(search_text, limit=10)

Search queries containing the specified text.

Parameters:

Name Type Description Default
search_text str

Text to search for in query content.

required
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[Any]

List of queries containing the search text.

SummaryRepository

Bases: GenericRepository[Summary]

Repository for Summary entity with composite key support.

__init__(session)

Initialize summary repository.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session for database operations.

required

compare_pipelines_by_metric(pipeline_ids, metric_id)

Compare multiple pipelines on a specific metric.

Parameters:

Name Type Description Default
pipeline_ids list[int]

List of pipeline IDs to compare.

required
metric_id int

The metric ID to compare on.

required

Returns:

Type Description
list[Summary]

List of summaries for the specified pipelines and metric, ordered by metric result.

delete_by_composite_key(pipeline_id, metric_id)

Delete a summary by its composite primary key.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required
metric_id int

The metric ID.

required

Returns:

Type Description
bool

True if the summary was deleted, False if not found.

exists_by_composite_key(pipeline_id, metric_id)

Check if a summary exists with the given composite key.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required
metric_id int

The metric ID.

required

Returns:

Type Description
bool

True if the summary exists, False otherwise.

get_by_composite_key(pipeline_id, metric_id)

Retrieve a summary by its composite primary key.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required
metric_id int

The metric ID.

required

Returns:

Type Description
Summary | None

The summary if found, None otherwise.

get_by_execution_time_range(pipeline_id, min_time, max_time)

Retrieve summaries within an execution time range.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required
min_time int

Minimum execution time (inclusive).

required
max_time int

Maximum execution time (inclusive).

required

Returns:

Type Description
list[Summary]

List of summaries within the specified time range.

get_by_metric_id(metric_id)

Retrieve all summaries for a specific metric.

Parameters:

Name Type Description Default
metric_id int

The metric ID.

required

Returns:

Type Description
list[Summary]

List of summaries for the metric.

get_by_metric_result_range(metric_id, min_result, max_result)

Retrieve summaries within a metric result range.

Parameters:

Name Type Description Default
metric_id int

The metric ID.

required
min_result float

Minimum metric result value (inclusive).

required
max_result float

Maximum metric result value (inclusive).

required

Returns:

Type Description
list[Summary]

List of summaries within the specified range.

get_by_pipeline_id(pipeline_id)

Retrieve all summaries for a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required

Returns:

Type Description
list[Summary]

List of summaries for the pipeline.

get_metric_summaries_with_relations(metric_id)

Retrieve all summaries for a metric with relations eagerly loaded.

Parameters:

Name Type Description Default
metric_id int

The metric ID.

required

Returns:

Type Description
list[Summary]

List of summaries with pipeline and metric loaded.

get_pipeline_summaries_with_relations(pipeline_id)

Retrieve all summaries for a pipeline with relations eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required

Returns:

Type Description
list[Summary]

List of summaries with pipeline and metric loaded.

get_top_pipelines_by_metric(metric_id, limit=10, ascending=False)

Retrieve top performing pipelines for a specific metric.

Parameters:

Name Type Description Default
metric_id int

The metric ID.

required
limit int

Maximum number of results to return.

10
ascending bool

If True, sort ascending (lower is better), otherwise descending (higher is better).

False

Returns:

Type Description
list[Summary]

List of summaries ordered by metric result.

get_with_all_relations(pipeline_id, metric_id)

Retrieve a summary with all relations eagerly loaded.

Parameters:

Name Type Description Default
pipeline_id int

The pipeline ID.

required
metric_id int

The metric ID.

required

Returns:

Type Description
Summary | None

The summary with all relations loaded, None if not found.

TextOnlyUnitOfWork

Bases: BaseUnitOfWork

Text-only Unit of Work for managing text data ingestion transactions.

This UoW focuses on text-based entities only (Query, Chunk, RetrievalRelation) and excludes image-related tables like ImageChunk.

Provides lazy-initialized repositories for efficient resource usage.

chunks property

Get the Chunk repository.

Returns:

Type Description
ChunkRepository

ChunkRepository instance.

Raises:

Type Description
SessionNotSetError

If session is not initialized.

queries property

Get the Query repository.

Returns:

Type Description
QueryRepository

QueryRepository instance.

Raises:

Type Description
SessionNotSetError

If session is not initialized.

retrieval_relations property

Get the RetrievalRelation repository.

Returns:

Type Description
RetrievalRelationRepository

RetrievalRelationRepository instance.

Raises:

Type Description
SessionNotSetError

If session is not initialized.

__init__(session_factory, schema=None)

Initialize Text-only Unit of Work with a session factory.

Parameters:

Name Type Description Default
session_factory sessionmaker

SQLAlchemy sessionmaker instance.

required
schema Any | None

Schema namespace from create_schema(). If None, uses default 768-dim schema.

None

UnitOfWork

Unit of Work pattern for managing database transactions.

Ensures data consistency by grouping multiple repository operations into a single atomic transaction.

__enter__()

Enter the context manager and create a new session.

Returns:

Type Description
UnitOfWork

Self for method chaining.

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager and clean up session.

Automatically rolls back if an exception occurred.

Parameters:

Name Type Description Default
exc_type Any

Exception type if an error occurred.

required
exc_val Any

Exception value if an error occurred.

required
exc_tb Any

Exception traceback if an error occurred.

required

__init__(session_factory)

Initialize Unit of Work with a session factory.

Parameters:

Name Type Description Default
session_factory Any

SQLAlchemy sessionmaker instance.

required

commit()

Commit the current transaction.

flush()

Flush pending changes without committing.

rollback()

Rollback the current transaction.

Service

autorag_research.orm.service

BaseEvaluationService

Bases: BaseService, ABC

Abstract base class for evaluation services.

Provides common patterns for evaluation workflows: 1. Fetch execution results in batches using Generator (abstract) 2. Filter missing query IDs that need evaluation (abstract) 3. Compute metrics with batch processing (base) 4. Save evaluation results (abstract)

The service supports: - Setting and changing metric functions dynamically - Batch processing with configurable batch size - Generator-based pagination to minimize memory usage and transaction issues

Example
service = RetrievalEvaluationService(session_factory, schema)

# Set metric and evaluate
service.set_metric(metric_id=1, metric_func=my_metric_func)
service.evaluate(pipeline_id=1, batch_size=100)

# Change metric and evaluate again
service.set_metric(metric_id=2, metric_func=another_metric_func)
service.evaluate(pipeline_id=1, batch_size=100)

metric_func property

Get current metric function.

metric_id property

Get current metric ID.

__init__(session_factory, schema=None)

Initialize the evaluation service.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy sessionmaker for database connections.

required
schema Any | None

Schema namespace from create_schema(). If None, uses default 768-dim schema.

None

evaluate(pipeline_id, batch_size=100)

Run the full evaluation pipeline for the current metric.

This method uses Generator-based pagination to process query IDs: 1. Iterates through query ID batches using limit/offset 2. Filters to only those missing evaluation results 3. Fetches execution results for the batch 4. Computes metrics in batch 5. Saves results to database

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID to evaluate.

required
batch_size int

Number of queries to process per batch.

100

Returns:

Type Description
int

Tuple of (queries_evaluated, average_score).

float | None

average_score is None if no queries were evaluated.

Raises:

Type Description
ValueError

If metric is not set.

get_metric(metric_name, metric_type=None)

Get metric by name and optionally type.

Parameters:

Name Type Description Default
metric_name str

The name of the metric.

required
metric_type str | None

Optional metric type filter ('retrieval' or 'generation').

None

Returns:

Type Description
Any | None

The Metric entity if found, None otherwise.

get_or_create_metric(name, metric_type)

Get existing metric or create a new one.

Parameters:

Name Type Description Default
name str

The metric name.

required
metric_type str

The metric type ('retrieval' or 'generation').

required

Returns:

Type Description
int | str

The metric ID.

is_evaluation_complete(pipeline_id, metric_id, batch_size=100)

Check if evaluation is complete for all queries.

Iterates through all query IDs and checks: 1. Each query has execution results 2. Each query has evaluation results for the given pipeline and metric

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID to check.

required
metric_id int | str

The metric ID to check.

required
batch_size int

Number of queries to check per batch.

100

Returns:

Type Description
bool

True if all queries have both execution and evaluation results,

bool

False otherwise (returns immediately on first missing).

set_metric(metric_id, metric_func, compute_granularity='query')

Set the metric ID and function for evaluation.

Parameters:

Name Type Description Default
metric_id int | str

The ID of the metric in the database.

required
metric_func MetricFunc

Function that takes list[MetricInput] and returns list[float | None].

required
compute_granularity Literal['query', 'dataset']

Metric compute granularity ("query" or "dataset").

'query'

verify_pipeline_completion(pipeline_id, batch_size=100)

Verify all queries have execution results for the pipeline.

Iterates through query IDs in batches and checks each batch has results. Returns False immediately when any query is missing results.

Parameters:

Name Type Description Default
pipeline_id int | str

The pipeline ID to verify.

required
batch_size int

Number of queries to check per batch.

100

Returns:

Type Description
bool

True if all queries have results, False otherwise.

Raises:

Type Description
NoQueryInDBError

If no queries exist in the database.

BasePipelineService

Bases: BaseService, ABC

Abstract base for pipeline services with shared pipeline management.

Provides: - get_or_create_pipeline(): Idempotent pipeline creation with resume support - get_pipeline_config(): Pipeline config retrieval by ID

Subclasses must implement _create_uow() and _get_schema_classes() as required by BaseService. The UoW returned by _create_uow() must expose a pipelines property (PipelineRepository).

get_or_create_pipeline(name, config, *, strict=False)

Get existing pipeline by name or create a new one.

If a pipeline with the given name already exists, returns its ID. If the existing pipeline has a different config, behavior depends on strict: - strict=False (default): logs a warning and reuses the existing pipeline. - strict=True: raises ValueError. If no pipeline exists, creates a new one.

Parameters:

Name Type Description Default
name str

Name for this pipeline (used as experiment identifier).

required
config dict

Configuration dictionary for the pipeline.

required
strict bool

When True, raise ValueError on config mismatch instead of warning.

False

Returns:

Type Description
tuple[int | str, bool]

Tuple of (pipeline_id, is_new) where is_new is True if a new pipeline was created.

Raises:

Type Description
ValueError

If strict=True and an existing pipeline has a different config.

get_pipeline_config(pipeline_id)

Get pipeline configuration by ID.

Parameters:

Name Type Description Default
pipeline_id int | str

ID of the pipeline.

required

Returns:

Type Description
dict[Any, Any] | None

Pipeline config dict if found, None otherwise.

GenerationEvaluationService

Bases: BaseEvaluationService

Service for evaluating generation pipelines.

This service handles the evaluation workflow for generation pipelines: 1. Fetch queries and ground truth (Query.generation_gt) 2. Fetch generation results (ExecutorResult.generation_result) 3. Compute evaluation metrics (e.g., BLEU, ROUGE, F1) 4. Store results in EvaluationResult table

The service uses MetricInput to pass data to metric functions, which should accept list[MetricInput] and return list[float | None].

Example
from autorag_research.orm.service import GenerationEvaluationService

# Create service
service = GenerationEvaluationService(session_factory, schema)

# Get or create metric
metric_id = service.get_or_create_metric("bleu", "generation")

# Set metric and evaluate
service.set_metric(metric_id=metric_id, metric_func=bleu_score)
count, avg = service.evaluate(pipeline_id=1, batch_size=100)
print(f"Evaluated {count} queries, average={avg}")

GenerationPipelineService

Bases: BasePipelineService

Service for running generation pipelines.

This service handles the common workflow for all generation pipelines: 1. Create a pipeline instance 2. Fetch queries from database 3. Run generation using the provided function (which handles retrieval internally) 4. Store results in ExecutorResult table

The actual generation logic (including retrieval) is provided as a function parameter, making this service reusable for NaiveRAG, iterative RAG, etc.

Example
from autorag_research.orm.service.generation_pipeline import GenerationPipelineService

# Create service
service = GenerationPipelineService(session_factory, schema)

# Create or resume pipeline
pipeline_id, is_new = service.get_or_create_pipeline(
    name="naive_rag_v1",
    config={"type": "naive_rag", "llm_model": "gpt-4"},
)

# Run pipeline with async generation function
results = service.run_pipeline(
    generate_func=my_async_generate_func,  # Async: handles retrieval + generation
    pipeline_id=pipeline_id,
    top_k=10,
)

delete_pipeline_results(pipeline_id)

Delete all generation results for a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id int | str

ID of the pipeline.

required

Returns:

Type Description
int

Number of deleted records.

get_chunk_contents(chunk_ids)

Get chunk contents by IDs.

Parameters:

Name Type Description Default
chunk_ids list[int | str]

List of chunk IDs to fetch.

required

Returns:

Type Description
list[str]

List of chunk content strings in the same order as input IDs.

get_image_chunk_contents(image_chunk_ids)

Fetch image chunk contents (bytes, mimetype) by IDs.

Parameters:

Name Type Description Default
image_chunk_ids list[int | str]

List of image chunk IDs.

required

Returns:

Type Description
list[tuple[bytes, str]]

List of (image_bytes, mimetype) tuples in same order as IDs.

list[tuple[bytes, str]]

Missing chunks return (b"", "image/png").

get_query_text(query_id)

Get the text of a query by its ID.

Parameters:

Name Type Description Default
query_id int | str

The ID of the query.

required

Returns:

Type Description
str

The text of the query.

Raises:

Type Description
ValueError

If the query with the given ID is not found.

run_pipeline(generate_func, pipeline_id, top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)

Run generation pipeline for all queries with parallel execution and retry.

Parameters:

Name Type Description Default
generate_func GenerateFunc

Async function that performs retrieval + generation. Signature: async (query_id: int, top_k: int) -> GenerationResult The function should handle retrieval internally.

required
pipeline_id int | str

ID of the pipeline.

required
top_k int

Number of top documents to retrieve per query.

10
batch_size int

Number of queries to fetch from DB at once.

128
max_concurrency int

Maximum number of concurrent async operations.

16
max_retries int

Maximum number of retry attempts for failed queries.

3
retry_delay float

Base delay in seconds for exponential backoff between retries.

1.0
query_limit int | None

Maximum number of queries to process. None means no limit.

None

Returns:

Type Description
dict[str, Any]

Dictionary with pipeline execution statistics:

dict[str, Any]
  • pipeline_id: The pipeline ID
dict[str, Any]
  • total_queries: Number of queries processed successfully
dict[str, Any]
  • token_usage: Aggregated token usage dict (prompt_tokens, completion_tokens, total_tokens, embedding_tokens)
dict[str, Any]
  • avg_execution_time_ms: Average execution time per query
dict[str, Any]
  • failed_queries: List of query IDs that failed after all retries

MultiModalIngestionService

Bases: BaseIngestionService

Service for multi-modal data ingestion operations.

This service provides batch-only methods for ingesting multi-modal RAG datasets. Users can access repositories directly via UoW for basic CRUD operations.

Design Principles: - Batch-only methods (no single-add methods) - No simple wrappers around repository functions - Value-added operations with transaction management and validation - Mixed multi-hop support for retrieval ground truth

Example
from autorag_research.orm.connection import DBConnection
from autorag_research.orm.service import MultiModalIngestionService

# Setup database connection
db = DBConnection.from_config()  # or DBConnection.from_env()
session_factory = db.get_session_factory()

# Initialize service
service = MultiModalIngestionService(session_factory)

# Read image file as bytes
with open("/path/to/image1.jpg", "rb") as f:
    image_bytes = f.read()

# Batch add files
file_ids = service.add_files([
    {"path": "/path/to/image1.jpg", "file_type": "image"},
    {"path": "/path/to/document1.pdf", "file_type": "raw"},
])

add_documents(documents)

Batch add documents to the database.

Parameters:

Name Type Description Default
documents list[dict]

List of dicts with keys: - filename (str | None) - title (str | None) - author (str | None) - filepath_id (int | None) - FK to File - metadata (dict | None) - JSONB metadata

required

Returns:

Type Description
list[int | str]

List of created Document IDs.

add_files(files)

Batch add files to the database.

Parameters:

Name Type Description Default
files list[dict[str, str]]

List of dictionary (path, file_type). file_type can be: "raw", "image", "audio", "video".

required

Returns:

Type Description
list[int | str]

List of created File IDs.

add_image_chunks(image_chunks)

Batch add image chunks to the database.

Uses memory-efficient bulk insert (SQLAlchemy Core) instead of ORM objects. This reduces memory usage by ~3-5x for large batches.

Parameters:

Name Type Description Default
image_chunks list[dict[str, bytes | str | int | None]]

List of dictionary (content, mimetype, parent_page_id). content: Image binary data (required) mimetype: Image MIME type e.g., "image/png" (required) parent_page_id: FK to Page (optional)

required

Returns:

Type Description
list[int | str]

List of created ImageChunk IDs.

add_pages(pages)

Batch add pages to the database.

Parameters:

Name Type Description Default
pages list[dict]

List of dicts with keys: - document_id (int) - FK to Document (required) - page_num (int) - Page number (required) - image_content (bytes | None) - Image binary data - mimetype (str | None) - Image MIME type (e.g., "image/png") - metadata (dict | None) - JSONB metadata

required

Returns:

Type Description
list[int | str]

List of created Page IDs.

embed_all_image_chunks(embed_func, batch_size=100, max_concurrency=10)

Embed all image chunks that don't have embeddings.

Parameters:

Name Type Description Default
embed_func ImageEmbeddingFunc

Async function that takes image bytes and returns embedding vector.

required
batch_size int

Number of image chunks to process per batch.

100
max_concurrency int

Maximum concurrent embedding calls.

10

Returns:

Type Description
int

Total number of image chunks successfully embedded.

embed_all_image_chunks_multi_vector(embed_func, batch_size=100, max_concurrency=10)

Embed all image chunks that don't have multi-vector embeddings.

Parameters:

Name Type Description Default
embed_func ImageMultiVectorEmbeddingFunc

Async function that takes image bytes and returns multi-vector embedding.

required
batch_size int

Number of image chunks to process per batch.

100
max_concurrency int

Maximum concurrent embedding calls.

10

Returns:

Type Description
int

Total number of image chunks successfully embedded.

get_statistics()

Get statistics about the ingested data.

Returns:

Type Description
dict

Dictionary with counts for all entity types and embedding status.

set_image_chunk_embeddings(image_chunk_ids, embeddings)

Batch set embeddings for image chunks.

Parameters:

Name Type Description Default
image_chunk_ids list[int | str]

List of image chunk IDs.

required
embeddings list[list[float]]

List of embedding vectors (must match image_chunk_ids length).

required

Returns:

Type Description
int

Total number of image chunks successfully updated.

Raises:

Type Description
LengthMismatchError

If image_chunk_ids and embeddings have different lengths.

set_image_chunk_multi_embeddings(image_chunk_ids, embeddings)

Batch set multi-vector embeddings for image chunks.

Parameters:

Name Type Description Default
image_chunk_ids list[int | str]

List of image chunk IDs.

required
embeddings list[list[list[float]]]

List of multi-vector embeddings (list of list of floats per image chunk).

required

Returns:

Type Description
int

Total number of image chunks successfully updated.

Raises:

Type Description
LengthMismatchError

If image_chunk_ids and embeddings have different lengths.

RetrievalEvaluationService

Bases: BaseEvaluationService

Service for evaluating retrieval pipelines.

This service handles the evaluation workflow for retrieval pipelines: 1. Fetch queries and ground truth (RetrievalRelation) 2. Fetch retrieval results (ChunkRetrievedResult) 3. Compute evaluation metrics (e.g., Recall@K, Precision@K, MRR) 4. Store results in EvaluationResult table

The service uses MetricInput to pass data to metric functions, which should accept list[MetricInput] and return list[float | None].

Example
from autorag_research.orm.service import RetrievalEvaluationService
from autorag_research.evaluation.metrics.retrieval import retrieval_recall

# Create service
service = RetrievalEvaluationService(session_factory, schema)

# Get or create metric
metric_id = service.get_or_create_metric("recall@10", "retrieval")

# Set metric and evaluate
service.set_metric(metric_id=metric_id, metric_func=retrieval_recall)
count, avg = service.evaluate(pipeline_id=1, batch_size=100)
print(f"Evaluated {count} queries, average={avg}")

# Evaluate another metric
metric_id_2 = service.get_or_create_metric("precision@10", "retrieval")
service.set_metric(metric_id=metric_id_2, metric_func=retrieval_precision)
service.evaluate(pipeline_id=1)

RetrievalPipelineService

Bases: BasePipelineService

Service for running retrieval pipelines.

This service handles the common workflow for all retrieval pipelines: 1. Create a pipeline instance 2. Fetch queries from database 3. Run retrieval using the provided retrieval function 4. Store results in ChunkRetrievedResult table

The actual retrieval logic is provided as a function parameter, making this service reusable for BM25, dense retrieval, hybrid, etc.

Example
from autorag_research.orm.service import RetrievalPipelineService

# Create service
service = RetrievalPipelineService(session_factory, schema)

# Direct search (for single-query use cases)
results = service.bm25_search(query_ids=[1, 2, 3], top_k=10)
results = service.vector_search(query_ids=[1, 2, 3], top_k=10)

# Or use run_pipeline for batch processing with result persistence
pipeline_id, is_new = service.get_or_create_pipeline(
    name="bm25",
    config={"type": "bm25", "tokenizer": "bert"},
)
stats = service.run_pipeline(
    retrieval_func=lambda ids, k: service.bm25_search(ids, k),
    pipeline_id=pipeline_id,
    top_k=10,
)

Execute BM25 retrieval for given query IDs.

Uses VectorChord-BM25 full-text search on the chunks table.

Parameters:

Name Type Description Default
query_ids list[int | str]

List of query IDs to search for.

required
top_k int

Number of top results to return per query.

10
tokenizer str

Tokenizer to use for BM25 (default: "bert").

'bert'
index_name str

Name of the BM25 index (default: "idx_chunk_bm25").

'idx_chunk_bm25'

Returns:

Type Description
list[list[dict[str, Any]]]

List of result lists, one per query. Each result dict contains:

list[list[dict[str, Any]]]
  • doc_id: Chunk ID
list[list[dict[str, Any]]]
  • score: BM25 relevance score
list[list[dict[str, Any]]]
  • content: Chunk text content

Raises:

Type Description
ValueError

If a query ID is not found in the database.

bm25_search_by_text(query_text, top_k=10, tokenizer='bert', index_name='idx_chunk_bm25')

Execute BM25 retrieval using raw query text (no Query entity needed).

Parameters:

Name Type Description Default
query_text str

The query text to search for.

required
top_k int

Number of top results to return.

10
tokenizer str

Tokenizer to use for BM25 (default: "bert").

'bert'
index_name str

Name of the BM25 index (default: "idx_chunk_bm25").

'idx_chunk_bm25'

Returns:

Type Description
list[dict[str, Any]]

List of result dicts containing doc_id, score, and content.

delete_pipeline_results(pipeline_id)

Delete all retrieval results for a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id int | str

ID of the pipeline.

required

Returns:

Type Description
int

Number of deleted records.

fetch_query_texts(query_ids)

Batch fetch query texts from database.

Parameters:

Name Type Description Default
query_ids list[int | str]

List of query IDs to fetch.

required

Returns:

Type Description
list[str]

List of query text contents in the same order as query_ids.

Raises:

Type Description
ValueError

If a query ID is not found.

find_query_by_text(query_text)

Find existing query by text content.

Parameters:

Name Type Description Default
query_text str

The query text to search for.

required

Returns:

Type Description
Any | None

The query if found, None otherwise.

run_pipeline(retrieval_func, pipeline_id, top_k=10, batch_size=128, max_concurrency=16, max_retries=3, retry_delay=1.0, query_limit=None)

Run retrieval pipeline for all queries with parallel execution and retry.

Parameters:

Name Type Description Default
retrieval_func RetrievalFunc

Async function that performs retrieval for a single query. Signature: async (query_id: int | str, top_k: int) -> list[dict] Each result dict must have 'doc_id' (int) and 'score' keys.

required
pipeline_id int | str

ID of the pipeline.

required
top_k int

Number of top documents to retrieve per query.

10
batch_size int

Number of queries to fetch from DB at once.

128
max_concurrency int

Maximum number of concurrent async operations.

16
max_retries int

Maximum number of retry attempts for failed queries.

3
retry_delay float

Base delay in seconds for exponential backoff between retries.

1.0
query_limit int | None

Maximum number of queries to process. None means no limit.

None

Returns:

Type Description
dict[str, Any]

Dictionary with pipeline execution statistics:

dict[str, Any]
  • pipeline_id: The pipeline ID
dict[str, Any]
  • total_queries: Number of queries processed successfully
dict[str, Any]
  • total_results: Number of results stored
dict[str, Any]
  • failed_queries: List of query IDs that failed after all retries

Execute vector search for given query IDs.

Supports single-vector (cosine similarity) and multi-vector (MaxSim) search modes using VectorChord extension.

Parameters:

Name Type Description Default
query_ids list[int | str]

List of query IDs to search for.

required
top_k int

Number of top results to return per query.

10
search_mode Literal['single', 'multi']

"single" for dense retrieval, "multi" for late interaction.

'single'

Returns:

Type Description
list[list[dict[str, Any]]]

List of result lists, one per query. Each result dict contains:

list[list[dict[str, Any]]]
  • doc_id: Chunk ID
list[list[dict[str, Any]]]
  • score: Relevance score in [-1, 1] range (higher = more relevant)
  • single: 1 - cosine_distance (= cosine_similarity)
  • multi: MaxSim / n_query_vectors (normalized late interaction)
list[list[dict[str, Any]]]
  • content: Chunk text content

Raises:

Type Description
ValueError

If a query ID is not found or lacks required embeddings.

vector_search_by_embedding(embedding, top_k=10)

Execute vector search using a provided embedding directly.

This method enables retrieval pipelines that generate embeddings dynamically (like HyDE) rather than using pre-computed query embeddings.

Parameters:

Name Type Description Default
embedding list[float]

The embedding vector to search with.

required
top_k int

Number of top results to return.

10

Returns:

Type Description
list[dict[str, Any]]

List of result dicts containing doc_id, score, and content.

list[dict[str, Any]]

Score is cosine similarity in [-1, 1] range.

TextDataIngestionService

Bases: BaseIngestionService

Service for text-only data ingestion operations.

Provides methods for:

  • Adding queries (with optional generation_gt)
  • Adding chunks (text-only, no parent page required)
  • Creating retrieval ground truth relations (with multi-hop support)
  • Setting embeddings for queries and chunks (accepts pre-computed vectors)
Example

Basic usage with queries, chunks, and retrieval ground truth:

from autorag_research.orm.connection import DBConnection
from autorag_research.orm.service import TextDataIngestionService

# Setup database connection
db = DBConnection.from_config()  # or DBConnection.from_env()
session_factory = db.get_session_factory()

# Initialize service
service = TextDataIngestionService(session_factory)

# Get statistics
stats = service.get_statistics()
print(stats)

clean()

Delete empty queries and chunks along with their associated retrieval relations.

This method should be called after data ingestion and before embedding to remove any queries or chunks with empty or whitespace-only content. It also removes associated retrieval relations to maintain referential integrity.

Returns:

Type Description
dict[str, int]

Dictionary with counts of deleted queries and chunks.

get_retrieval_gt_by_query(query_id)

Get all retrieval ground truth relations for a query.

Parameters:

Name Type Description Default
query_id int | str

The query ID.

required

Returns:

Type Description
list[Any]

List of RetrievalRelation entities ordered by group_index and group_order.

get_statistics()

Get statistics about the ingested data.

Returns:

Type Description
dict

Dictionary with counts of queries, chunks, and embeddings status.

Utilities

autorag_research.util

TokenUsageTracker

Collects token usage from LangChain LLM responses.

Usage::

tracker = TokenUsageTracker()
tracker.record(response)      # extract + store from LangChain response
result = tracker.total        # aggregated total across all calls
per_call = tracker.history    # per-call breakdown

history property

Per-call token usage breakdown (defensive copy).

total property

Aggregated total token usage across all recorded calls.

record(response)

Extract and record token usage from a LangChain LLM response.

Parameters:

Name Type Description Default
response Any

LangChain LLM response object (AIMessage, etc.)

required

Returns:

Type Description
dict[str, int] | None

Extracted usage dict, or None if not available.

aggregate_token_usage(current, new)

Aggregate two token usage dicts (accumulator pattern).

Parameters:

Name Type Description Default
current dict[str, int] | None

Current aggregated token usage (or None).

required
new dict[str, int] | None

New token usage to add (or None).

required

Returns:

Type Description
dict[str, int] | None

Aggregated token usage dict, or None if both inputs are None.

bytes_to_pil_image(image_bytes)

Convert image bytes to PIL Image.

Parameters:

Name Type Description Default
image_bytes bytes

Raw image bytes (PNG, JPEG, etc.)

required

Returns:

Type Description
Image

PIL Image object.

convert_inputs_to_list(func)

Decorator to convert all function inputs to Python lists.

Parameters:

Name Type Description Default
func Callable

The function to be decorated.

required

Returns:

Type Description
Callable

The wrapped function that converts all inputs to lists.

extract_image_from_data_uri(data_uri)

Extract image bytes and mimetype from a data URI.

extract_token_logprobs(response, target_tokens=None)

Extract log probabilities from LangChain LLM response.

Works with any LangChain LLM that stores logprobs in response_metadata["logprobs"]["content"]. Compatible providers include: - OpenAI (ChatOpenAI, AzureChatOpenAI) - Together AI, Fireworks AI, Anyscale - Local models via vLLM, text-generation-inference, Ollama (with logprobs enabled) - Any OpenAI-compatible API endpoint

To enable logprobs, use provider-specific configuration: - OpenAI/vLLM: llm.bind(logprobs=True, top_logprobs=5) - Other providers: Check provider documentation for logprobs support

LangChain stores logprobs in response.response_metadata["logprobs"]["content"]. Each token entry has: token, logprob, bytes, top_logprobs.

Parameters:

Name Type Description Default
response Any

LangChain AIMessage or similar response object.

required
target_tokens list[str] | None

If provided, only return logprobs for these tokens. Case-insensitive matching. If None, returns all token logprobs.

None

Returns:

Type Description
dict[str, float] | None

Dict mapping token strings to their log probability values.

dict[str, float] | None

Returns None if logprobs not available in response.

Example

Enable logprobs on the LLM (OpenAI example)

llm = ChatOpenAI(model="gpt-4o-mini").bind(logprobs=True, top_logprobs=5) response = llm.invoke("Answer Yes or No: Is the sky blue?") logprobs = extract_token_logprobs(response, target_tokens=["Yes", "No"])

Returns: {"Yes": -0.0001, "No": -9.2} or None if not available

Note
  • log probability of 0.0 = 100% confidence
  • More negative = less likely
  • Convert to probability: exp(logprob)

image_chunk_to_pil_images(image_chunks)

Convert raw image bytes to PIL Images, skipping invalid ones.

Parameters:

Name Type Description Default
image_chunks list[tuple[bytes, str]]

List of (bytes, mimetype) tuples. It can be a result of the GET operation from the ImageChunk repository.

required

Returns:

Type Description
list[Image]

List of valid PIL Images.

load_image(img)

Convert any ImageType to a PIL Image in RGB mode.

Accepts file paths (str/Path), raw bytes, or BytesIO objects and returns a PIL Image converted to RGB.

Parameters:

Name Type Description Default
img ImageType

Image as file path (str/Path), raw bytes, or BytesIO.

required

Returns:

Type Description
Image

PIL Image in RGB mode.

Raises:

Type Description
TypeError

If img is not a supported type.

normalize_dbsf(scores)

3-sigma distribution-based score fusion normalization.

Normalizes using mean ± 3*std as bounds, then clips to [0, 1]. This method is robust to outliers and works well when combining scores from different distributions. None values are preserved and excluded from statistics calculation.

Reference: "Score Normalization in Multi-Engine Text Retrieval"

Parameters:

Name Type Description Default
scores list[float | None]

List of numeric scores to normalize. None values are preserved.

required

Returns:

Type Description
list[float | None]

List of normalized scores clipped to [0, 1] range, with None preserved.

Example

normalize_dbsf([1.0, 2.0, 3.0, 4.0, 5.0]) [0.0, 0.25, 0.5, 0.75, 1.0] # approximately normalize_dbsf([1.0, None, 3.0, 4.0, 5.0]) [0.0, None, 0.333..., 0.5, 0.666...] # approximately

normalize_minmax(scores)

Min-max normalization to [0, 1] range.

Scales scores linearly so that the minimum becomes 0 and maximum becomes 1. If all scores are equal, returns a list of 0.5 values. None values are preserved and excluded from statistics calculation.

Parameters:

Name Type Description Default
scores list[float | None]

List of numeric scores to normalize. None values are preserved.

required

Returns:

Type Description
list[float | None]

List of normalized scores in [0, 1] range, with None preserved.

Example

normalize_minmax([1.0, 2.0, 3.0]) [0.0, 0.5, 1.0] normalize_minmax([1.0, None, 3.0]) [0.0, None, 1.0]

normalize_string(s)

Taken from the official evaluation script for v1.1 of the SQuAD dataset. Lower text and remove punctuation, articles, and extra whitespace.

normalize_tmm(scores, theoretical_min)

Theoretical min-max normalization using theoretical min and actual max.

Uses the theoretical minimum bound and actual maximum from the data. This is useful when the minimum is known (e.g., 0 for BM25) but the maximum varies per query. None values are preserved and excluded from statistics calculation.

Parameters:

Name Type Description Default
scores list[float | None]

List of numeric scores to normalize. None values are preserved.

required
theoretical_min float

Known minimum possible score (e.g., 0 for BM25, -1 for cosine).

required

Returns:

Type Description
list[float | None]

List of normalized scores in [0, 1] range, with None preserved.

Example

normalize_tmm([0.0, 50.0, 100.0], theoretical_min=0.0) [0.0, 0.5, 1.0] normalize_tmm([0.0, None, 100.0], theoretical_min=0.0) [0.0, None, 1.0]

normalize_zscore(scores)

Z-score standardization (mean=0, std=1).

Centers scores around mean and scales by standard deviation. If standard deviation is 0 (all scores equal), returns all zeros. None values are preserved and excluded from statistics calculation.

Parameters:

Name Type Description Default
scores list[float | None]

List of numeric scores to normalize. None values are preserved.

required

Returns:

Type Description
list[float | None]

List of z-score normalized values, with None preserved.

Example

normalize_zscore([1.0, 2.0, 3.0]) [-1.2247..., 0.0, 1.2247...] normalize_zscore([1.0, None, 3.0]) [-1.0, None, 1.0]

pil_image_to_bytes(image)

Convert PIL image to bytes with mimetype.

Parameters:

Name Type Description Default
image Image

PIL Image object.

required

Returns:

Type Description
tuple[bytes, str]

Tuple of (image_bytes, mimetype).

pil_image_to_data_uri(image)

Convert PIL Image to data URI for multi-modal LLMs.

Parameters:

Name Type Description Default
image Image

PIL Image object.

required

Returns:

Type Description
str

Data URI string (e.g., "data:image/png;base64,iVBORw0...").

run_with_concurrency_limit(items, async_func, max_concurrency, error_message='Task failed') async

Run async function on items with concurrency limit using semaphore.

A generic utility for running async operations with controlled concurrency. Each item is processed by the async function, with at most max_concurrency operations running simultaneously.

Parameters:

Name Type Description Default
items Iterable[T]

Iterable of items to process.

required
async_func Callable[[T], Awaitable[R]]

Async function that takes an item and returns a result.

required
max_concurrency int

Maximum number of concurrent operations.

required
error_message str

Message to log when an operation fails.

'Task failed'

Returns:

Type Description
list[R | None]

List of results (or None if failed) in same order as items.

Example
async def embed_text(text: str) -> list[float]:
    return await some_api_call(text)

texts = ["hello", "world", "test"]
embeddings = await run_with_concurrency_limit(
    texts,
    embed_text,
    max_concurrency=5,
    error_message="Failed to embed text",
)

to_async_func(func)

Convert a synchronous function to an asynchronous function.

Parameters:

Name Type Description Default
func Callable[..., R]

The synchronous function to convert.

required

Returns:

Type Description
Callable[..., Awaitable[R]]

An asynchronous function that runs the synchronous function in a thread.

to_list(item)

Recursively convert collections to Python lists.

Parameters:

Name Type Description Default
item Any

The item to convert to a list. Can be numpy array, pandas Series, or any iterable collection.

required

Returns:

Type Description
Any

The converted Python list.

truncate_texts(str_list, max_tokens)

Truncate each string in the list to a maximum number of tokens using tiktoken.

Parameters:

Name Type Description Default
str_list list[str]

List of strings to be truncated.

required
max_tokens int

Maximum number of tokens allowed per string.

required

Returns:

Type Description
list[str]

List of truncated strings.

unpack_and_run(target_list, func, *args, **kwargs)

Unpack each sublist in target_list and run func with the unpacked arguments.

Parameters:

Name Type Description Default
target_list list[list[Any]]

List of sublists to be unpacked and processed.

required
func Callable

Callable function to run on the flattened list.

required
*args tuple

Additional positional arguments to pass to func.

()
**kwargs Any

Additional keyword arguments to pass to func.

{}

Returns:

Type Description
list[Any]

List of results grouped by original sublist lengths.

validate_plugin_name(name)

Check whether name is a valid plugin name.

A valid plugin name starts with a lowercase letter and contains only lowercase letters, digits, and underscores. This guards against path traversal and code-injection when the name is interpolated into filesystem paths and Python source templates.

Parameters:

Name Type Description Default
name str

The candidate plugin name.

required

Returns:

Type Description
bool

True if valid, False otherwise.

autorag_research.exceptions

DuplicateRetrievalGTError

Bases: Exception

Raised when retrieval GT already exists for a query and upsert is False.

EmbeddingError

Bases: Exception

Raised when the embedding model is not set.

EmptyIterableError

Bases: Exception

Raised when an iterable is empty but should contain items.

EnvNotFoundError

Bases: Exception

Raised when a required environment variable is not found.

EvaluationError

Bases: ExecutorError

Raised when evaluation fails.

ExecutorError

Bases: Exception

Base exception for Executor errors.

HealthCheckError

Bases: ExecutorError

Raised when a health check fails before the full pipeline run.

InvalidDatasetNameError

Bases: NameError

Raised when an invalid dataset name is provided.

LLMError

Bases: Exception

Raised when the LLM model is not set.

LengthMismatchError

Bases: Exception

Raised when there is a length mismatch between two related lists.

LogprobsNotSupportedError

Bases: ExecutorError

Raised when a pipeline requires logprobs but the LLM doesn't support them.

MaxRetriesExceededError

Bases: ExecutorError

Raised when max retries are exceeded.

MissingDBNameError

Bases: Exception

Raised when the database name is missing in the configuration.

MissingRequiredParameterError

Bases: Exception

Raised when required parameters are missing.

NoQueryInDBError

Bases: Exception

Raised when there are no queries in the database.

NoSessionError

Bases: Exception

Raised when there is no active database session.

PipelineExecutionError

Bases: ExecutorError

Raised when a pipeline fails to execute.

PipelineVerificationError

Bases: ExecutorError

Raised when pipeline results fail verification.

RepositoryNotSupportedError

Bases: Exception

Raised when a repository is not supported by the current UoW.

RerankerError

Bases: Exception

Raised when the reranker model fails.

SchemaNotFoundError

Bases: Exception

Raised when a schema is not found.

ServiceNotSetError

Bases: Exception

Raised when the service is not set.

SessionNotSetError

Bases: Exception

Raised when the database session is not set.

UnsupportedDataSubsetError

Bases: Exception

Raised when an unsupported data subset is requested.

UnsupportedLanguageError

Bases: Exception

Raised when an unsupported language is specified.

UnsupportedMTEBTaskTypeError

Bases: Exception

Raised when an unsupported MTEB task type is provided.