These examples are real YAML files in the repository under examples/pipelines/. They are designed to match the current YAML parser expectations:
- Top-level
pipelinemust be a list of{ stage, args } - Each stage entry must contain only
stageandargs(no extra keys) fetch.tracesitems must useaction/factory+ optionalparams
fetch:
url: "https://books.toscrape.com"
pipeline:
- stage: explore
args: [ "li.next a", 2 ]
- stage: join
args: [ "article.product_pod h3 a", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- { selector: ".price_color", method: "text", as: "price_raw" }
- { selector: ".product_main img", method: "attr:src", as: "image_src" }fetch:
url: "https://quotes.toscrape.com"
traces:
- { action: "visit", params: { url: "https://quotes.toscrape.com", cooldown: 0.5 } }
- { action: "wait", params: { seconds: 1 } }
pipeline:
- stage: visitJoin
args: [ "a.tag", "LeftOuter" ]
- stage: flatSelect
args:
- "div.quote"
- - { selector: "span.text", method: "text", as: "quote_text" }
- { selector: "small.author", method: "text", as: "author" }fetch:
url: "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html"
pipeline:
- stage: iextract
args:
- "Extract title as title and price as price and product code as sku"
- "prod_"fetch:
url: "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html"
pipeline:
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- { selector: ".price_color", method: "text", as: "price_raw" }
- { selector: "#content_inner article", method: "text", as: "description_text" }
- { field: "price_raw", method: "price", as: "price_numeric" }
- { field: "description_text", method: "llm", as: "llm_features" }
- { field: "description_text", method: "llm", args: ["extract 3 bullet points of benefits"], as: "llm_map" }python_extensions:
stages:
price_normalizer:
type: row_transform
function: |
def price_normalizer(row):
# ... your python code ...
return row
fetch:
url: "https://books.toscrape.com"
pipeline:
- stage: extract
args:
- { selector: ".product_pod h3 a", method: "text", as: "title" }
- { selector: ".product_pod .price_color", method: "text", as: "price_raw" }
- stage: python_row_transform:price_normalizer
args: []These examples are meant to “prepare the ground” for verticals where you aggregate records from multiple sources. The key idea is:
- Union records coming from different upstream crawls / sources (even if schemas differ).
- Apply set semantics via dedup using a stable key (e.g.
sku,ean,url, plussource).
Current approach (workaround): Since YAML pipelines are sequential and don't support branching, the pattern is:
- Run source pipeline A → save output to disk
- Run source pipeline B → save output to disk
- Run stitching pipeline →
load_union+dedup
Files:
- Source A:
examples/pipelines/11-vertical-source-a-offers.yaml - Source B:
examples/pipelines/12-vertical-source-b-offers.yaml - Stitching:
examples/pipelines/13-vertical-stitch-union-dedup-offers.yaml
Single-pipeline (now supported via store/reset/union_with):
To avoid intermediate disk I/O, you can branch in-memory and merge within the same pipeline using helper stages:
Option A (single pipeline with in-memory branches):
examples/pipelines/17-single-pipeline-multi-source-union.yamlcache(optional but recommended) beforestoreto avoid recompute when reusing the branchstorecaches the current dataset under a labelresetstarts a fresh datasetunion_withunions the current dataset with a stored branch
Option B (same helpers, alternative flow):
examples/pipelines/18-single-pipeline-alternative-syntax.yaml
Aggregates product offers from 5 commercial e-commerce sources, with product matching by EAN.
Aggregates odds from 5 bookmakers for odds comparison.
Uses intelligent_explore and intelligent_flatSelect to extract odds from bookmaker sites with complex, non-trivial table structures, then detects arbitrage (surebet) opportunities.
Aggregates property listings from 5 real estate sites, clusters similar properties to identify the same property across sources, and detects arbitrage opportunities using external statistical sources (market averages, price percentiles).
Builds an instruction-following dataset under a No-CC policy by combining customer-owned documentation (crawl) with pre-curated public domain and permissive non-CC datasets (CSV), then cleans and deduplicates.
Aggregates multi-source financial data (prices, macroeconomic indicators, news/sentiment, alternative data), calculates technical indicators (RSI, MACD, Bollinger Bands, volatility), generates 90-day forward targets, aligns time-series data, and exports a training dataset for LLM fine-tuning. Part of the Feeless portfolio management layer for agentic pools.
Note: Paths can use environment placeholders ${VAR_NAME} (resolved by the runner before parsing).
examples/pipelines/15-aggregation-group-by-key.yaml(sentiment → aggregatesentiment by key)examples/pipelines/16-aggregation-monthly.yaml(sentiment_monthly macro-stage)