Skip to content

Pipelines

Pipelines are a fully-specified recipe for how to go from content, to embeddings, and back again.

Example ingestion pipeline
steps:
  - step: KnowledgeBaseFiles
    name: input
    step_args:
      # specify the source knowledgebases to sync
      knowledgebase_names: ["my_kb"]
      # specify the target index
      vector_index_name: my_vector_index
    inputs: []

  - step: Preprocessor
    name: preprocessor
    step_args: {}
    inputs: [input]

  - step: Chunker
    name: simple_chunker
    step_args:
      chunk_size_words: 320
      chunk_overlap: 30
    inputs: [input]

  - step: SentenceTransformerEmbedder
    name: sentence-transformers
    step_args:
      model_name: BAAI/bge-base-en-v1.5
      include_metadata: [ title, file_name ]
    inputs: [ simple_chunker ]

  - step: ChunkWriter
    name: save
    step_args:
      vector_index_name: my_vector_index
    inputs: [sentence-transformers]
Example query pipeline
steps:
  - step: SentenceTransformerEmbedder
    name: query_embedder
    step_args:
      model_name: BAAI/bge-base-en-v1.5
      include_metadata: [ title, file_name ]
      query: "placeholder"
    inputs: [ ]

  - step: Retriever
    name: retriever
    step_args:
      vector_index_name: my_vector_index
      top_k: 100
      metadata_filters: { }
    inputs: ["query_embedder"]

  - step: Reranker
    name: reranker
    step_args:
      query: "placeholder"
      model_name: BAAI/bge-reranker-base
      top_k: 5
      metadata_filters: { }
    inputs: [ retriever ]

Steps

Each step is an atomic, constituent component of a pipeline. Each step is defined by three things:

Key Value Type Value Description
name string A name for this step (you can use this name to refer to this step in subsequent inputs sections).
step_args object The step-specific arguments which are passed to this step at runtime. See these docs for the specific arguments you can pass for each step.
inputs array An array of step names which this step depends on. A step's execution is triggered once all the steps in the dependency array have executed successfully.

Dependency Graphs

Behind the scenes, OneContext builds an execution graph of your steps based on the dependencies in your dependency-arrays. This graph is then used to execute your pipeline in the most efficient way possible.

Deploy a new Pipeline

from onecontext import OneContext

oc = OneContext()

pipeline = oc.deploy_pipeline("my_pipeline", "pipeline_config.yml")

where pipeline_config.yml refers to the pipeline configuration file in YAML format.

List all the Pipelines

from onecontext import OneContext

oc = OneContext()

all_pipelines = oc.list_pipelines()

Delete a Pipeline

from onecontext import OneContext

oc = OneContext()

oc.delete_pipeline("my_pipeline")

Run a Pipeline with Override Arguments

Overriding specific step arguments in a pipeline allows for customized processing and retrieval:

The override_args parameter allows you to modify the default arguments of each step in the pipeline for a specific run. Passed as a dictionary, it specifies the step names as keys, and the step arguments to override as key-value pairs.

from onecontext import OneContext

oc = OneContext()

pipeline = oc.Pipeline("my_pipeline")


override_args = {
    "query_embedder": {"query": query},
    "retriever": {
        "top_k": retriever_top_k,
    },
    "reranker": {"top_k": top_k, "query": query},
}

return_chunks = pipeline.run(override_args)