Skip to content
Last updated

Pipeline Examples

These examples are real YAML files in the repository under examples/pipelines/. They are designed to match the current YAML parser expectations:

  • Top-level pipeline must be a list of { stage, args }
  • Each stage entry must contain only stage and args (no extra keys)
  • fetch.traces items must use action/factory + optional params

Static crawl (HTTP only)

fetch:
  url: "https://books.toscrape.com"

pipeline:
  - stage: explore
    args: [ "li.next a", 2 ]
  - stage: join
    args: [ "article.product_pod h3 a", "LeftOuter" ]
  - stage: extract
    args:
      - { selector: "h1", method: "text", as: "title" }
      - { selector: ".price_color", method: "text", as: "price_raw" }
      - { selector: ".product_main img", method: "attr:src", as: "image_src" }

Dynamic crawl (browser + flatSelect)

fetch:
  url: "https://quotes.toscrape.com"
  traces:
    - { action: "visit", params: { url: "https://quotes.toscrape.com", cooldown: 0.5 } }
    - { action: "wait", params: { seconds: 1 } }

pipeline:
  - stage: visitJoin
    args: [ "a.tag", "LeftOuter" ]
  - stage: flatSelect
    args:
      - "div.quote"
      - - { selector: "span.text", method: "text", as: "quote_text" }
        - { selector: "small.author", method: "text", as: "author" }

LLM extraction (iextract) with prompt-only args

fetch:
  url: "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html"

pipeline:
  - stage: iextract
    args:
      - "Extract title as title and price as price and product code as sku"
      - "prod_"

Attribute resolvers (column-based + selector-based)

fetch:
  url: "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html"

pipeline:
  - stage: extract
    args:
      - { selector: "h1", method: "text", as: "title" }
      - { selector: ".price_color", method: "text", as: "price_raw" }
      - { selector: "#content_inner article", method: "text", as: "description_text" }
      - { field: "price_raw", method: "price", as: "price_numeric" }
      - { field: "description_text", method: "llm", as: "llm_features" }
      - { field: "description_text", method: "llm", args: ["extract 3 bullet points of benefits"], as: "llm_map" }

Python row transforms (python_row_transform:<name>)

python_extensions:
  stages:
    price_normalizer:
      type: row_transform
      function: |
        def price_normalizer(row):
            # ... your python code ...
            return row

fetch:
  url: "https://books.toscrape.com"

pipeline:
  - stage: extract
    args:
      - { selector: ".product_pod h3 a", method: "text", as: "title" }
      - { selector: ".product_pod .price_color", method: "text", as: "price_raw" }
  - stage: python_row_transform:price_normalizer
    args: []

I/O operations (CSV load/save)

Search engine integration (EAN enrichment)

Browser automation (fetch.traces with actions)

Multi-source aggregation (set union + dedup) — vertical-ready

These examples are meant to “prepare the ground” for verticals where you aggregate records from multiple sources. The key idea is:

  • Union records coming from different upstream crawls / sources (even if schemas differ).
  • Apply set semantics via dedup using a stable key (e.g. sku, ean, url, plus source).

1) Union seed lists, dedup by URL, then fetch + extract

2) Stitch outputs from multiple crawls, then dedup by business key

3) Vertical pattern: run 2 source pipelines, then stitch their outputs

Current approach (workaround): Since YAML pipelines are sequential and don't support branching, the pattern is:

  • Run source pipeline A → save output to disk
  • Run source pipeline B → save output to disk
  • Run stitching pipeline → load_union + dedup

Files:

Single-pipeline (now supported via store/reset/union_with):

To avoid intermediate disk I/O, you can branch in-memory and merge within the same pipeline using helper stages:

Vertical use cases

Price comparison (5 e-commerce sites)

Aggregates product offers from 5 commercial e-commerce sources, with product matching by EAN.

Sports betting odds aggregation (5 bookmakers)

Aggregates odds from 5 bookmakers for odds comparison.

Surebet detection (intelligent extraction)

Uses intelligent_explore and intelligent_flatSelect to extract odds from bookmaker sites with complex, non-trivial table structures, then detects arbitrage (surebet) opportunities.

Real estate arbitrage (property clustering)

Aggregates property listings from 5 real estate sites, clusters similar properties to identify the same property across sources, and detects arbitrage opportunities using external statistical sources (market averages, price percentiles).

LLM fine-tuning dataset construction

Builds an instruction-following dataset under a No-CC policy by combining customer-owned documentation (crawl) with pre-curated public domain and permissive non-CC datasets (CSV), then cleans and deduplicates.

Portfolio management & 90-day asset prediction

Aggregates multi-source financial data (prices, macroeconomic indicators, news/sentiment, alternative data), calculates technical indicators (RSI, MACD, Bollinger Bands, volatility), generates 90-day forward targets, aligns time-series data, and exports a training dataset for LLM fine-tuning. Part of the Feeless portfolio management layer for agentic pools.

Note: Paths can use environment placeholders ${VAR_NAME} (resolved by the runner before parsing).

4) Append upstream dataset to the current dataset

Aggregations (group-by style)