Complete reference of pipeline stages, their YAML syntax, configuration options, and extension mechanisms.
Pipeline stages are categorized into:
- Core Stages: Basic data extraction and navigation
- Intelligent Stages: LLM-powered stages for adaptive extraction
- Utility Stages: Specialized processing stages
- Use-Case Stages: Domain-specific stages
- Python Extensions: Custom Python stages
- Each
pipelineentry must be a mapping with only:stage(string)args(array, optional)
- Extra keys on a stage item (for example
name,when, etc.) are not supported by the Scala YAML parser and will fail parsing.
Stage lookup is case-insensitive and tolerant to underscores: visitJoin, visit_join, and visitjoin resolve to the same registered stage.
Purpose: Breadth-first link discovery and crawling
YAML Syntax:
pipeline:
- stage: explore
args: [ "a.category-link", 2 ] # selector, depthArguments:
selector(String): CSS selector for links (or a$columnreference)depth(Int, optional): Maximum depth to crawl (default:1)
Example:
pipeline:
- stage: explore
args: [ "a.category", 3 ]Purpose: Follow links for each row using HTTP (wget)
YAML Syntax:
pipeline:
- stage: join
args: [ "a.product-link", "LeftOuter" ] # selector, optional joinTypeArguments:
selector(String): CSS selector for links (or a$columnreference)joinType(String, optional):InnerorLeftOuter(default:LeftOuter)
Example:
pipeline:
- stage: join
args: [ "a.detail-link", "Inner" ]Purpose: Follow links using the wgetJoin(...) primitive.
YAML Syntax:
pipeline:
- stage: wgetJoin # also works as wget_join
args: [ "a.detail-link", "LeftOuter" ]Arguments:
selector(String): CSS selector for links (or a$columnreference)joinType(String, optional):InnerorLeftOuter(default:LeftOuter)
Purpose: Breadth-first crawling using the wgetExplore(...) primitive.
YAML Syntax:
pipeline:
- stage: wgetExplore # also works as wget_explore
args: [ "li.next a", 2 ]Arguments:
selector(String): CSS selector for links (or a$columnreference)depth(Int, optional): Maximum depth to crawl (default:1)
Purpose: Extract fields using selector-map configuration
YAML Syntax:
pipeline:
- stage: extract
args:
- { selector: "CSS_SELECTOR", method: "METHOD", as: "COLUMN_NAME" }
- { selector: "...", method: "...", args: [...], as: "..." }Selector-Map Structure:
- selector: "span.price" # Required: CSS selector
method: "price" # Required: extraction method
as: "price_numeric" # Optional: output column aliasExample:
pipeline:
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- { selector: "span.price", method: "price", as: "price_numeric" }
- { selector: "a", method: "attr:href", as: "link" }Purpose: Segment HTML block and create one row per segment (one row per repeated element). Requires example-plugin.
YAML Syntax:
pipeline:
- stage: flatSelect
args:
- "div.product" # segment selector
- # extractors applied inside each segment (relative selectors)
- { selector: "h3", method: "text", as: "name" }
- { selector: ".price", method: "text", as: "price_raw" }
- { selector: "img", method: "attr(src)", as: "img_src" }Arguments:
segmentSelector(String): CSS selector used to split the page into segmentsextractors(List[Map]): extractor maps applied to each segment (selectors are relative)
Example:
pipeline:
- stage: flatSelect
args:
- "div.product-item"
- - { selector: "h3", method: "text", as: "product_name" }
- { selector: ".price", method: "text", as: "price_raw" }The Intelligent stages reduce repeated LLM calls by caching inferred selectors at template-cluster level:
- Template fingerprint: the HTML is fingerprinted (SimHash) and assigned to a cluster (layout template).
- Cache key:
(namespace, clusterId, kind, promptHash). - RoadRunner prepopulation: on cache miss (or to improve generic selectors), we record HTML samples for the cluster and run RoadRunner template induction to draft a stable wrapper selector. This draft can later be validated/overridden by the LLM inference pipeline.
This is implemented in the core template layer (SelectorResolver + RoadRunnerPrepopulator) and used by stages like intelligent_flatSelect and iextract.
Purpose: Like explore, but uses LLM to infer selector from natural language prompt
YAML Syntax:
pipeline:
- stage: intelligent_explore
args: [ "category links", 2 ] # prompt, depthExample:
pipeline:
- stage: intelligent_explore
args: [ "navigation menu items", 1 ]Purpose: Join with inferred selector (PTA/LLM) + inferred actions
YAML Syntax:
pipeline:
- stage: intelligent_join
args: [ "selectorPrompt or auto", "actionPrompt", limit ]Example:
pipeline:
- stage: intelligent_join
args: [ "auto", "click product link", 20 ]Purpose: LLM extraction on HTML block (chunking)
YAML Syntax:
pipeline:
- stage: iextract
# extractor is optional; if omitted a default { selector: "body", method: "code" } is injected
args: [ prompt, prefix ]Example:
pipeline:
- stage: iextract
args:
- "Extract product specifications as title, price and sku"
- "spec_"Purpose: Segmentation + multi-row extraction with intelligent selector inference.
Key behavior:
- Accepts either a literal CSS selector (fast path) or a natural language segmentation prompt.
- Uses template-aware selector caching and RoadRunner (template recognition) to re-use inferred selectors across pages that share the same layout template.
YAML Syntax:
pipeline:
- stage: intelligent_flatSelect
args: [ segPrompt, extrPrompt, prefix ]Example:
pipeline:
- stage: intelligent_flatSelect
args: [ "product cards", "extract name and price", "product" ]All stages below are available only when the example-plugin is loaded and Plugin.registerAll() has been called.
Purpose: Fetch a URL from a column using HTTP.
Args:
0(optional): URL extractor (usually$url). Defaults to_(internal default column).
pipeline:
- stage: wget
args: [ "$url" ]Purpose: Fetch a URL from a column using browser automation.
Args:
0(optional): URL extractor (usually$url). Defaults to_.
pipeline:
- stage: visit
args: [ "$url" ]Purpose: Follow links using a browser (Visit) for each row.
Args:
0(required): selector or$columncontaining URLs1(optional): joinType (Inner/LeftOuter, defaultLeftOuter)
pipeline:
- stage: visitJoin
args: [ "a.product-link", "LeftOuter" ]Purpose: Breadth-first crawling using a browser (Visit).
Args:
0(required): selector or$columncontaining URLs1(optional): depth (Int, default1)
pipeline:
- stage: visitExplore
args: [ "li.next a", 2 ]Purpose: Segment the page into repeated blocks and extract one output row per block.
Args:
0(required): segment selector (String)1(optional): list of extractor maps applied inside each segment
pipeline:
- stage: flatSelect
args:
- "div.product"
- - { selector: "h3", method: "text", as: "name" }
# NOTE: for flatSelect the plugin stage expects `attr(name)` style
- { selector: "img", method: "attr(src)", as: "img_src" }Purpose: Placeholder stage for LLM table extraction (currently does not perform extraction).
Args: none (current implementation does not use args).
pipeline:
- stage: intelligent_table
args: []Purpose: LLM-powered ad-hoc extraction from an HTML field (row transform).
Args:
0(required): input field name containing HTML/text1(required): query/prompt2(required): output field name (result JSON string)
pipeline:
- stage: intelligentExtract
args: [ "description_html", "Extract brand and model", "llm_json" ]Purpose: LLM extraction that produces multiple columns (usually with a prefix).
Args:
- If the first argument is omitted or is not an extractor map /
$column, a default extractor{ selector: "body", method: "code" }is injected. - Then:
prompt(String)prefix(optional String)
pipeline:
- stage: iextract
args:
- "Extract title as title and price as price and product code as sku"
- "prod_"Purpose: Debug helper; writes _echo column.
Args:
0(optional): message string
pipeline:
- stage: echo
args: [ "hello" ]Purpose: Persist the current dataset in Spark (equivalent to calling .cache() on the underlying dataset). Useful before expensive downstream stages.
Args: none.
pipeline:
- stage: cache
args: []Purpose: Compose multi-source pipelines inside a single YAML file:
store: save the current dataset under a labelreset: start a fresh empty datasetunion_with: union the current dataset with one or more stored labels
Important: These are implemented by the YAML pipeline runner as control-flow helpers (not standard StageRegistry stages). They are still written as normal stage: items and follow the same YAML constraints.
pipeline:
# Source A
- stage: visit
args: [ "https://example.com" ]
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- stage: cache
args: []
- stage: store
args: [ "source_a" ]
# Source B
- stage: reset
args: []
- stage: load_csv
args:
- { path: "${SOURCE_B_CSV}", header: "true", inferSchema: "true" }
- stage: store
args: [ "source_b" ]
# Merge
- stage: reset
args: []
- stage: union_with
args: [ "source_a", "source_b" ]
- stage: dedup
args: [ "url" ]Purpose: Keep only rows whose country field is in the allowed list.
Args: one or more country codes.
pipeline:
- stage: filter_country
args: [ "IT", "FR" ]Purpose: Simple lexicon-based sentiment on a text field; adds sentiment and count.
Args:
0(optional): text field name (default:message)
pipeline:
- stage: sentiment
args: [ "message" ]Purpose: Reduce-by-key aggregation; sums sentiment and count by key.
Args:
0(optional): group field name (default:entity)
pipeline:
- stage: aggregatesentiment
args: [ "country" ]Purpose: Group-by-key and compute average sentiment into avg_sentiment.
Args:
0(optional): group field name (default:entity)
pipeline:
- stage: avg_sentiment_by_key
args: [ "country" ]Purpose: Macro-stage that runs sentiment, derives month from date (YYYY-MM-DD), then runs aggregatesentiment by month.
Args:
0(optional): text field name (default:message)
pipeline:
- stage: sentiment_monthly
args: [ "message" ]Purpose: Reduce-by-key aggregation; sums numeric sales by country.
Args: none.
pipeline:
- stage: sum_sales
args: []Purpose: Cluster real-estate listings (or other entities) using unsupervised learning (example-plugin stage). Produces a cluster_id to support entity resolution and arbitrage analysis.
Args: a single map with:
algorithm:kmeans|dbscan(best-effort) | others (plugin-dependent)k(for kmeans), plus optional parameters depending on algorithmfeatures: list of numeric feature column names
pipeline:
- stage: propertyCluster
args:
- { algorithm: "kmeans", k: 5, features: ["latitude", "longitude", "price_per_sqm", "area_sqm"] }Purpose: Load a CSV into the pipeline (creates a new dataset).
Args:
0(required): either a path string OR a map{ path: "...", <spark_options...> }1..n(optional):key=valueSpark read options
pipeline:
- stage: load_csv
args:
- { path: "s3a://bucket/input.csv", header: "true", inferSchema: "true" }Purpose: Save current dataset as CSV (returns the same plan so you can continue).
Args:
0(required): output path1(optional): mode (overwrite|append|errorifexists|ignore, defaultoverwrite)
pipeline:
- stage: save_csv
args: [ "s3a://bucket/out/", "overwrite" ]All connector stages accept as first argument either:
- a string (path / table / resource), or
- a map
{ path: "...", <options...> }depending on the connector.
Available stages:
load_avroload_deltaload_icebergload_xmlload_bigquery(table or options withtable)load_athena(JDBC options:url,dbtable,driver…)load_mongodb(options:uri,database,collection…)load_cassandra(keyspace.tableor options withkeyspace+table)load_elasticsearch(options withes.resource,es.nodes…)load_kafka(options withkafka.bootstrap.servers,subscribe…)
These stages help you aggregate records from multiple upstream sources/pipelines using set semantics:
- union: combine rows from different sources (schema differences allowed)
- dedup: enforce “set union” by keeping 1 row per key
Purpose: Start a pipeline by loading multiple datasets and unioning them by column name (allowMissingColumns=true).
Args: one or more source spec maps:
format(required):csv|delta|avro|bigquery|mongo|...path(optional): dataset path/table identifier (depends on connector)options(optional): reader options
pipeline:
- stage: load_union
args:
- { format: "csv", path: "s3a://bucket/a.csv", options: { header: "true" } }
- { format: "csv", path: "s3a://bucket/b.csv", options: { header: "true" } }Purpose: Union the current dataset with one or more external datasets loaded from specs (also allows missing columns).
pipeline:
- stage: unionByName
args:
- { format: "delta", path: "s3a://bucket/upstream_delta/" }Purpose: Deduplicate rows.
- If
argsis empty: full-row distinct - Else:
argsare key column names (recommended for vertical stitching)
pipeline:
- stage: dedup
args: [ "sku", "source" ]Purpose: Search by EAN using a provider (Google Custom Search / Bing) and optionally enrich results.
Args:
0(required): config map with keys like:provider:googleorbingean: literal EAN or$ean_columnapi_key,cx(google), optional if provided via environmentnum_results,image_search,enrich,as
pipeline:
- stage: searchEngine
args:
- provider: "google"
ean: "$ean"
num_results: 10
image_search: true
enrich: true
as: "search_json"Purpose: Fetch JSON from social/financial APIs (GET/POST) with auth via header or query param.
Args:
0(required): config map with keys likeprovider,endpoint,params,auth_token,method,body,as
pipeline:
- stage: socialAPI
args:
- provider: "twitter"
endpoint: "tweets/search/recent"
params: { query: "python", max_results: 10 }
auth_token: "${TWITTER_BEARER_TOKEN}"
as: "tweets_json"Purpose: Fetch Eurostat REST API JSON.
Args:
0(required): config map:dataset, optionalfilters, optionalparams,as
Purpose: Fetch ISTAT SDMX JSON.
Args:
0(required): config map:flow, optionalkey, optionalprovider, optionalparams,as
Purpose: Recalculate matching score based on input fields and iextract output prefix.
Args:
0(optional): config map (all keys optional; defaults are applied)
pipeline:
- stage: enrichMatchingScore
args:
- ean_field: "EAN number"
description_field: "Item description"
brand_field: "Brand"
extracted_prefix: "prod_"
output_field: "matching_score"Purpose: Evaluate image similarity scores using LLM + heuristics. Expects image URLs in fields like images and prod_product_image_urls.
Args: none.
These stages exist and are registered, but several are currently no-op placeholders (see code comments in UseCaseStages.scala).
Purpose: normalize price strings into price_numeric, price_currency, and price_<currency>.
Args:
0(optional): target currency (defaultUSD)
Args: none.
Purpose: convert odds to odds_decimal.
Args: none.
Args: none.
Purpose: compute area_sqm and price_per_sqm.
Args: none.
Purpose: compute simple z-score outliers (arbitrage_z, is_arbitrage) from price_per_sqm.
Args: none.
Args: none.
Args:
0(optional): max iterations (default5)1(optional): wait seconds (default1.0)
All stages below accept a single optional config map as args[0].
Built-in extraction methods available in extract stages:
| Method | Description | Example |
|---|---|---|
text | Extract visible text | { selector: "h1", method: "text", as: "title" } |
code | Extract full HTML | { selector: "div", method: "code", as: "html" } |
html | Alias of code (legacy) | { selector: "article", method: "html", as: "content" } |
attr:NAME | Extract HTML attribute (recommended for extract) | { selector: "a", method: "attr:href", as: "link" } |
attrs | Extract all attributes | { selector: "img", method: "attrs", as: "image_attrs" } |
Custom resolvers are registered via plugins and can be used in YAML:
Available Custom Resolvers:
| Resolver | Description | Arguments | Example |
|---|---|---|---|
price | Extract a numeric price token from text | none | { selector: "span.price", method: "price", as: "price_numeric" } |
llm | LLM-based extraction | optional instruction string (args: [...]) | { selector: "div", method: "llm", args: ["extract brand and model"], as: "llm_map" } |
Usage:
pipeline:
- stage: extract
args:
- { selector: "span.price", method: "price", as: "price_numeric" }
- { selector: "div.product", method: "llm", args: ["extract specifications"], as: "specs" }How Attribute Resolvers Work:
Resolution Chain: When using an extraction method, the system checks:
- First: Native methods (
text,code,attr(*)) - Then: AttributeResolver Registry
- Finally: Dynamic invocation if found
- First: Native methods (
Registration: Custom resolvers are registered in
AttributeResolverRegistryduring plugin initializationArguments: Arguments are passed to the resolver in order via the
argsarray
Built-in action factories for browser interactions:
| Action | Parameters | Example |
|---|---|---|
visit | url (String), optional cooldown (seconds) | { action: "visit", params: { url: "https://example.com", cooldown: 0.5 } } |
wait | seconds (Int/Double) | { action: "wait", params: { seconds: 2 } } |
scroll_to_bottom | behavior = smooth|auto | { action: "scroll_to_bottom", params: { behavior: "smooth" } } |
prompt | prompt (String) or text (String) | { action: "prompt", params: { prompt: "click the login button" } } |
Usage:
fetch:
url: "https://example.com"
traces:
- { action: "wait", params: { seconds: 1 } }
- { action: "scroll_to_bottom", params: { behavior: "smooth" } }
- { action: "prompt", params: { prompt: "click the login button" } } # AI-driven actionCustom actions can be added via plugins by implementing an ActionFactory and making it discoverable by the runtime (ServiceLoader-based discovery).
Usage in YAML:
fetch:
url: "https://example.com"
traces:
- { action: "custom_click", params: { selector: ".button" } }Python functions that transform entire rows:
Function Signature:
def my_transform(row: Dict[str, Any]) -> Dict[str, Any]:
"""Transform row data"""
if hasattr(row, 'asDict'):
row_dict = row.asDict()
else:
row_dict = dict(row)
# Your transformation logic
return row_dictRegistration:
python_registry.register_row_transform("my_transform", my_transform)Usage in YAML:
pipeline:
- stage: python_row_transform:my_transformPython row transforms require the Spark job runner to register the Python registry via Py4J (PythonBridgeRegistry.setPythonRegistry(...)). If a function name is not registered, the corresponding python_row_transform:<name> stage will fail at runtime.
Some API flows support providing python_extensions alongside the pipeline YAML. The Scala YAML parser ignores this section, but the API can use it to generate PySpark code that registers these functions.
python_extensions:
stages:
my_transform:
type: row_transform
function: |
def my_transform(row):
row["processed"] = True
return rowfetch:
url: "https://shop.example.com"
traces:
- { action: "wait", params: { seconds: 1 } }
- { action: "prompt", params: { prompt: "accept cookies if present" } }
pipeline:
# Intelligent exploration
- stage: intelligent_explore
args: [ "category links", 2 ]
# Join product pages (LLM-assisted selector + optional inferred actions + optional limit)
- stage: intelligent_join
args: [ "product detail links", "none", 20 ]
# Extract with native and custom resolvers
- stage: extract
args:
- { selector: "h1.title", method: "text", as: "product_title" }
- { selector: "span.price", method: "price", as: "price_numeric" }
- { selector: "div.description", method: "code", as: "description_html" }
- { selector: "img.product-image", method: "attr:src", as: "image_url" }
- { selector: "div.reviews", method: "llm", args: ["Extract average rating"], as: "rating" }
# Python transformation
- stage: python_row_transform:normalize_price
args: []
# Filter by country
- stage: filter_country
args: [ "US" ]- Start Simple: Begin with native methods (
text,code,attr) - Use Custom Resolvers: Only for complex, reusable extraction logic
- Prefer Wget: Use
wgetfor static content,visitonly when JavaScript is needed - AI Traces: Use AI-driven traces for adaptive scraping resilient to layout changes
- Python Extensions: Use Python extensions for complex data transformations
- Error Handling: Always handle edge cases in custom resolvers and Python functions
- Documentation: Document custom resolvers and Python functions with clear docstrings
curl -X POST https://api.webrobot.eu/api/webrobot/api/agents \
-H "X-API-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"name": "my-agent",
"categoryId": "1",
"pipelineYaml": "pipeline:\n - stage: join\n args: [\"a.product-link\", \"LeftOuter\"]\n - stage: extract\n args:\n - { selector: \"h1\", method: \"text\", as: \"title\" }",
"enabled": true
}'curl -X PUT https://api.webrobot.eu/api/webrobot/api/agents/1/123 \
-H "X-API-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"pipelineYaml": "pipeline:\n - stage: join\n args: [\"a.product-link\"]\n - stage: extract\n args:\n - { selector: \"h1\", method: \"text\", as: \"title\" }"
}'- Learn how to build a complete pipeline
- Explore Attribute Resolvers in detail
- Check out Python Extensions guide
- If you are using the EAN plugin, see EAN Image Sourcing Plugin for the stage set + CloudCredential injection behavior.
- Review the API Reference for complete endpoint documentation