Pipelines
Pipelines are a fully-specified recipe for how to go from content, to embeddings, and back again.
Example ingestion pipeline
steps:
- step: KnowledgeBaseFiles
name: input
step_args:
# specify the source knowledgebases to sync
knowledgebase_names: ["my_kb"]
# specify the target index
vector_index_name: my_vector_index
inputs: []
- step: Preprocessor
name: preprocessor
step_args: {}
inputs: [input]
- step: Chunker
name: simple_chunker
step_args:
chunk_size_words: 320
chunk_overlap: 30
inputs: [input]
- step: SentenceTransformerEmbedder
name: sentence-transformers
step_args:
model_name: BAAI/bge-base-en-v1.5
include_metadata: [ title, file_name ]
inputs: [ simple_chunker ]
- step: ChunkWriter
name: save
step_args:
vector_index_name: my_vector_index
inputs: [sentence-transformers]
Example query pipeline
steps:
- step: SentenceTransformerEmbedder
name: query_embedder
step_args:
model_name: BAAI/bge-base-en-v1.5
include_metadata: [ title, file_name ]
query: "placeholder"
inputs: [ ]
- step: Retriever
name: retriever
step_args:
vector_index_name: my_vector_index
top_k: 100
metadata_filters: { }
inputs: ["query_embedder"]
- step: Reranker
name: reranker
step_args:
query: "placeholder"
model_name: BAAI/bge-reranker-base
top_k: 5
metadata_filters: { }
inputs: [ retriever ]
Steps
Each step
is an atomic, constituent component of a pipeline. Each step
is defined by three things:
Key | Value Type | Value Description |
---|---|---|
name |
string |
A name for this step (you can use this name to refer to this step in subsequent inputs sections). |
step_args |
object |
The step-specific arguments which are passed to this step at runtime. See these docs for the specific arguments you can pass for each step. |
inputs |
array |
An array of step names which this step depends on. A step's execution is triggered once all the steps in the dependency array have executed successfully. |
Dependency Graphs
Behind the scenes, OneContext builds an execution graph of your steps based on the dependencies in your dependency-arrays. This graph is then used to execute your pipeline in the most efficient way possible.
Deploy a new Pipeline
const indexPipelineCreateArgs: OneContext.PipelineCreateType = OneContext.PipelineCreateSchema.parse({
API_KEY: API_KEY,
pipelineName: indexPipelineName,
pipelineYaml: "./quickstart/example_yamls/index.yaml",
})
OneContext.createPipeline(indexPipelineCreateArgs).then((res) => {console.log(res)})
where index.yaml
refers to the pipeline configuration file in YAML format.
List all the Pipelines
const ListPipelinesArgs: OneContext.ListPipelinesType = OneContext.ListPipelinesSchema.parse({
API_KEY: API_KEY,
verbose: true
})
OneContext.listPipelines(ListPipelinesArgs).then((res) => {console.log(res)})
Setting verbose equal to true will also display the pipeline config (yaml file) for each pipeline.
Delete a Pipeline
const PipelineDeleteArgs: OneContext.PipelineDeleteType = OneContext.PipelineDeleteSchema.parse({API_KEY: API_KEY, pipelineName: "demoIndexPipeline"})
OneContext.deletePipeline(PipelineDeleteArgs).then((res) => {
if (res.ok)
{console.log(`Deleted pipeline ${pipe.pipelineName} successfully`)}
})
Run a Pipeline with Override Arguments
Overriding specific step arguments in a pipeline allows for customized processing and retrieval:
The overrideArgs
parameter allows you to modify the default arguments of
each step in the pipeline for a specific run. Passed as a dictionary, it
specifies the step names as keys, and the step arguments to override as
key-value pairs.
const query: string = "How much wood could a woodchuck chuck if a woodchuck could chuck wood?"
const QueryPipelineRunArgs: OneContext.RunType = OneContext.RunSchema.parse({
API_KEY: API_KEY,
pipelineName: QueryPipelineName,
overrideArgs: {"query_embedder" : {"query" : query}, "retriever": {"top_k" : 100}, "reranker": { "query" : query }}
})
OneContext.runPipeline(QueryPipelineRunArgs).then((res) => {console.log(util.inspect(res, {showHidden: true, colors: true}))})