This guide demonstrates how to build production-ready price comparison pipelines that aggregate product offers from multiple e-commerce sources, normalize prices, and track availability.
Goal: Build a unified product catalog by aggregating offers from multiple e-commerce sites, enabling:
- Price comparison: Compare prices for the same product across sources
- Availability tracking: Monitor stock levels and availability
- Price history: Track price changes over time
- Product matching: Match products across sources using EAN, SKU, or other identifiers
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source A │ │ Source B │ │ Source C │
│ (Crawl) │ │ (API) │ │ (Search) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────▼──────┐
│ Union & │
│ Deduplicate│
└──────┬──────┘
│
┌──────▼──────┐
│ Normalize │
│ Schema │
└──────┬──────┘
│
┌──────▼──────┐
│ Enrich │
│ (EAN, etc) │
└──────┬──────┘
│
┌──────▼──────┐
│ Save to │
│ Storage │
└─────────────┘Important: When starting a pipeline with crawling stages (
explore,join, etc.), you must include afetch:section with a starting URL. Pipelines that start withload_csvor other non-crawling stages don't requirefetch:.
Use in-memory branching to combine multiple sources in a single pipeline:
# Complete example: Multi-source price aggregation
fetch:
url: "https://books.toscrape.com" # Starting URL for Source A
pipeline:
# ============================================
# Source A: Direct crawl
# ============================================
- stage: explore
args: [ "li.next a", 2 ]
- stage: join
args: [ "article.product_pod h3 a", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- { selector: ".price_color", method: "text", as: "price_raw" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "meta[property='product:ean']", method: "attr:content", as: "ean" }
- stage: cache
args: []
- stage: store
args: [ "source_a_offers" ]
# ============================================
# Source B: API-based discovery
# ============================================
- stage: reset
args: []
- stage: load_csv
args:
- { path: "${INPUT_CSV_PATH}", header: "true", inferSchema: "true" }
- stage: searchEngine
args:
- provider: "google"
ean: "$ean"
num_results: 5
enrich: true
- stage: visit
args: [ "$result_link" ]
- stage: extract
args:
- { selector: "title", method: "text", as: "title" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "meta[property='product:price:amount']", method: "attr:content", as: "price_raw" }
- { selector: "meta[property='product:ean']", method: "attr:content", as: "ean" }
- stage: cache
args: []
- stage: store
args: [ "source_b_offers" ]
# ============================================
# Merge: Union all sources
# ============================================
- stage: reset
args: []
- stage: union_with
args: [ "source_a_offers", "source_b_offers" ]
# ============================================
# Deduplication by EAN (if available) or URL
# ============================================
- stage: dedup
args: [ "ean", "url" ] # Deduplicate by EAN first, then URL
# ============================================
# Normalize prices and schema
# ============================================
- stage: python_row_transform:normalize_price
args: []
# ============================================
# Save final result
# ============================================
- stage: save_csv
args: [ "${OUTPUT_PATH_STITCHED}", "overwrite" ]File: examples/pipelines/17-single-pipeline-multi-source-union.yaml
Run separate pipelines for each source, then stitch outputs:
Source A Pipeline:
# Source A: Direct crawl
fetch:
url: "https://books.toscrape.com"
pipeline:
- stage: explore
args: [ "li.next a", 2 ]
- stage: join
args: [ "article.product_pod h3 a", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1", method: "text", as: "title" }
- { selector: ".price_color", method: "text", as: "price_raw" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- stage: save_csv
args: [ "${OUTPUT_PATH_A}", "overwrite" ]Source B Pipeline:
# Source B: API-based discovery
pipeline:
- stage: load_csv
args:
- { path: "${INPUT_CSV_PATH}", header: "true", inferSchema: "true" }
- stage: searchEngine
args:
- provider: "google"
ean: "$ean"
num_results: 5
enrich: true
- stage: visit
args: [ "$result_link" ]
- stage: extract
args:
- { selector: "title", method: "text", as: "title" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "meta[property='product:price:amount']", method: "attr:content", as: "price_raw" }
- stage: save_csv
args: [ "${OUTPUT_PATH_B}", "overwrite" ]Stitching Pipeline:
# Stitching: Union and deduplicate
pipeline:
- stage: load_union
args:
- { format: "csv", path: "${OUTPUT_PATH_A}", options: { header: "true", inferSchema: "true" } }
- { format: "csv", path: "${OUTPUT_PATH_B}", options: { header: "true", inferSchema: "true" } }
- stage: dedup
args: [ "url" ]
- stage: save_csv
args: [ "${OUTPUT_PATH_STITCHED}", "overwrite" ]Files:
- Source A:
examples/pipelines/11-vertical-source-a-offers.yaml - Source B:
examples/pipelines/12-vertical-source-b-offers.yaml - Stitching:
examples/pipelines/13-vertical-stitch-union-dedup-offers.yaml
This example demonstrates a complete production pipeline that aggregates product offers from 5 major e-commerce sites and generates a product matching table for price comparison.
- Amazon - Direct crawl
- eBay - Direct crawl
- Walmart - Direct crawl
- Target - API-based (CSV input)
- Best Buy - Direct crawl
# Complete example: Price comparison across 5 e-commerce sites
fetch:
url: "https://www.amazon.com/s?k=electronics" # Starting URL for Amazon
pipeline:
# ============================================
# Site 1: Amazon (Direct crawl)
# ============================================
- stage: explore
args: [ "a.s-pagination-next", 5 ] # Navigate through search pages
- stage: join
args: [ "h2 a.a-link-normal", "LeftOuter" ] # Click on product links
- stage: extract
args:
- { selector: "span#productTitle", method: "text", as: "product_name" }
- { selector: "span.a-price-whole", method: "text", as: "price_raw" }
- { selector: "span.a-price-symbol", method: "text", as: "currency_symbol" }
- { selector: "span#productEAN", method: "text", as: "ean" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "img#landingImage", method: "attr:src", as: "image_url" }
- stage: python_row_transform:normalize_amazon
args: []
- stage: python_row_transform:add_metadata
args:
- source: "amazon.com"
- stage: cache
args: []
- stage: store
args: [ "amazon_offers" ]
# ============================================
# Site 2: eBay (Direct crawl)
# ============================================
- stage: reset
args: []
- stage: visit
args: [ "https://www.ebay.com/sch/i.html?_nkw=electronics" ]
- stage: explore
args: [ "a.pagination__next", 5 ]
- stage: join
args: [ "h3.s-item__title a", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1#x-item-title-label", method: "text", as: "product_name" }
- { selector: "span.notranslate", method: "text", as: "price_raw" }
- { selector: "span.ux-textspans", method: "text", as: "currency_symbol" }
- { selector: "div.ux-labels-values__values-content span", method: "text", as: "ean" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "img#icImg", method: "attr:src", as: "image_url" }
- stage: python_row_transform:normalize_ebay
args: []
- stage: python_row_transform:add_metadata
args:
- source: "ebay.com"
- stage: cache
args: []
- stage: store
args: [ "ebay_offers" ]
# ============================================
# Site 3: Walmart (Direct crawl)
# ============================================
- stage: reset
args: []
- stage: visit
args: [ "https://www.walmart.com/search?q=electronics" ]
- stage: explore
args: [ "a[data-testid='pagination-next']", 5 ]
- stage: join
args: [ "a[data-testid='product-title']", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1.prod-ProductTitle", method: "text", as: "product_name" }
- { selector: "span.price-characteristic", method: "text", as: "price_raw" }
- { selector: "span.price-currency", method: "text", as: "currency_symbol" }
- { selector: "span[itemprop='gtin13']", method: "text", as: "ean" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "img[data-testid='product-image']", method: "attr:src", as: "image_url" }
- stage: python_row_transform:normalize_walmart
args: []
- stage: python_row_transform:add_metadata
args:
- source: "walmart.com"
- stage: cache
args: []
- stage: store
args: [ "walmart_offers" ]
# ============================================
# Site 4: Target (API-based - CSV input)
# ============================================
- stage: reset
args: []
- stage: load_csv
args:
- { path: "${TARGET_API_RESPONSE_PATH}", header: "true", inferSchema: "true" }
- stage: python_row_transform:normalize_target
args: []
- stage: python_row_transform:add_metadata
args:
- source: "target.com"
- stage: cache
args: []
- stage: store
args: [ "target_offers" ]
# ============================================
# Site 5: Best Buy (Direct crawl)
# ============================================
- stage: reset
args: []
- stage: visit
args: [ "https://www.bestbuy.com/site/searchpage.jsp?st=electronics" ]
- stage: explore
args: [ "a[aria-label='Next page']", 5 ]
- stage: join
args: [ "h4.sku-title a", "LeftOuter" ]
- stage: extract
args:
- { selector: "h1.heading-5", method: "text", as: "product_name" }
- { selector: "div.priceView-customer-price span", method: "text", as: "price_raw" }
- { selector: "span.currency-symbol", method: "text", as: "currency_symbol" }
- { selector: "span[itemprop='gtin13']", method: "text", as: "ean" }
- { selector: "link[rel=canonical]", method: "attr:href", as: "url" }
- { selector: "img.product-image", method: "attr:src", as: "image_url" }
- stage: python_row_transform:normalize_bestbuy
args: []
- stage: python_row_transform:add_metadata
args:
- source: "bestbuy.com"
- stage: cache
args: []
- stage: store
args: [ "bestbuy_offers" ]
# ============================================
# Merge: Union all sources
# ============================================
- stage: reset
args: []
- stage: union_with
args: [ "amazon_offers", "ebay_offers", "walmart_offers", "target_offers", "bestbuy_offers" ]
# ============================================
# Deduplication by EAN (if available) or URL
# ============================================
- stage: dedup
args: [ "ean", "url" ] # Prioritize EAN for matching, fallback to URL
# ============================================
# Price Normalization & Currency Conversion
# ============================================
- stage: python_row_transform:normalize_price
args: []
- stage: python_row_transform:convert_currency
args:
- target_currency: "USD" # Convert all prices to USD
# ============================================
# Save unified offers dataset
# ============================================
- stage: save_csv
args: [ "${OUTPUT_PATH_UNIFIED_OFFERS}", "overwrite" ]# python_extensions:
# stages:
# normalize_amazon:
# type: row_transform
# function: |
def normalize_amazon(row):
"""Normalize Amazon product data."""
price_raw = row.get("price_raw", "").replace(",", "").strip()
currency_symbol = row.get("currency_symbol", "$")
try:
row["price_numeric"] = float(price_raw)
row["currency"] = "USD" if currency_symbol == "$" else "EUR"
except:
row["price_numeric"] = None
row["ean"] = row.get("ean", "").strip()
return row
# normalize_ebay:
# type: row_transform
# function: |
def normalize_ebay(row):
"""Normalize eBay product data."""
price_raw = row.get("price_raw", "").replace("$", "").replace(",", "").strip()
try:
row["price_numeric"] = float(price_raw)
row["currency"] = "USD"
except:
row["price_numeric"] = None
return row
# normalize_walmart:
# type: row_transform
# function: |
def normalize_walmart(row):
"""Normalize Walmart product data."""
price_raw = row.get("price_raw", "").replace("$", "").replace(",", "").strip()
try:
row["price_numeric"] = float(price_raw)
row["currency"] = "USD"
except:
row["price_numeric"] = None
return row
# normalize_target:
# type: row_transform
# function: |
def normalize_target(row):
"""Normalize Target API response."""
row["price_numeric"] = float(row.get("price", 0))
row["currency"] = row.get("currency", "USD")
row["product_name"] = row.get("name", "")
row["ean"] = row.get("gtin", "")
return row
# normalize_bestbuy:
# type: row_transform
# function: |
def normalize_bestbuy(row):
"""Normalize Best Buy product data."""
price_raw = row.get("price_raw", "").replace("$", "").replace(",", "").strip()
try:
row["price_numeric"] = float(price_raw)
row["currency"] = "USD"
except:
row["price_numeric"] = None
return row
# normalize_price:
# type: row_transform
# function: |
def normalize_price(row):
"""Final price normalization."""
if not row.get("price_numeric"):
price_raw = row.get("price_raw", "")
# Try to extract numeric value
import re
numbers = re.findall(r'\d+\.?\d*', price_raw.replace(",", ""))
if numbers:
row["price_numeric"] = float(numbers[0])
return row
# convert_currency:
# type: row_transform
# function: |
def convert_currency(row, target_currency="USD"):
"""Convert prices to target currency (simplified - use real API in production)."""
# In production, use a currency conversion API
currency = row.get("currency", "USD")
price = row.get("price_numeric", 0)
# Simplified conversion rates (use real-time rates in production)
rates = {"USD": 1.0, "EUR": 1.1, "GBP": 1.25}
if currency != target_currency and currency in rates:
row["price_numeric"] = price * rates[currency]
row["currency"] = target_currency
return rowCritical Challenge: The same product can appear with different structures and names across different e-commerce platforms. The EAN (European Article Number) is the primary reference key for product matching, but it's not always available or correctly formatted.
Primary Key: EAN Code
- EAN-13 (13 digits) is the most reliable identifier
- Normalize EAN: remove spaces, dashes, leading zeros
- Validate EAN format (checksum validation)
Fallback Matching (when EAN is missing or invalid):
- Product Name Normalization: Remove brand variations, standardize formatting
- SKU Matching: Use SKU if available (less reliable, retailer-specific)
- URL Canonicalization: Extract product ID from URL patterns
- Fuzzy Matching: Use similarity algorithms on product names + brand
Aggregation by EAN: All price comparisons must be aggregated by EAN code to create a unified comparison table.
# python_extensions:
# stages:
# normalize_ean:
# type: row_transform
# function: |
def normalize_ean(row):
"""Normalize and validate EAN code."""
ean_raw = row.get("ean", "").strip()
# Remove spaces, dashes, and other separators
ean_cleaned = ean_raw.replace(" ", "").replace("-", "").replace(".", "")
# Remove leading zeros if present (EAN-13 should be 13 digits)
ean_cleaned = ean_cleaned.lstrip("0")
# Pad to 13 digits if needed
if len(ean_cleaned) < 13:
ean_cleaned = ean_cleaned.zfill(13)
elif len(ean_cleaned) > 13:
# Take last 13 digits if longer
ean_cleaned = ean_cleaned[-13:]
# Validate EAN-13 checksum
if len(ean_cleaned) == 13 and ean_cleaned.isdigit():
# EAN-13 checksum validation
checksum = 0
for i in range(12):
digit = int(ean_cleaned[i])
if i % 2 == 0:
checksum += digit
else:
checksum += digit * 3
check_digit = (10 - (checksum % 10)) % 10
if check_digit == int(ean_cleaned[12]):
row["ean_normalized"] = ean_cleaned
row["ean_valid"] = True
else:
row["ean_normalized"] = ean_cleaned # Keep even if checksum fails
row["ean_valid"] = False
else:
row["ean_normalized"] = None
row["ean_valid"] = False
return row
# normalize_product_name:
# type: row_transform
# function: |
def normalize_product_name(row):
"""Normalize product name for fuzzy matching when EAN is missing."""
import re
product_name = row.get("product_name", "").strip()
# Remove common prefixes/suffixes
product_name = re.sub(r'^(New|Used|Refurbished|Renewed)\s+', '', product_name, flags=re.IGNORECASE)
product_name = re.sub(r'\s+(New|Used|Refurbished|Renewed)$', '', product_name, flags=re.IGNORECASE)
# Normalize whitespace
product_name = re.sub(r'\s+', ' ', product_name).strip()
# Remove special characters but keep spaces
product_name = re.sub(r'[^\w\s-]', '', product_name)
# Convert to lowercase for comparison
row["product_name_normalized"] = product_name.lower()
return rowBefore price comparison, aggregate all offers by EAN code (downstream). WebRobot ETL emits row-level offers; build the final EAN pivot/matching table using Trino/Spark SQL (see the next section for a concrete Trino query).
WebRobot ETL examples produce row-level offers (one row per offer per source). The final product matching table (pivoted by source and aggregated by ean_normalized) should be built downstream using Trino/Spark SQL.
Assuming you exported the unified offers dataset to a table offers with at least: ean_normalized, product_name, price_numeric, source, url
SELECT
ean_normalized AS ean,
max_by(product_name, length(product_name)) AS product_name_canonical,
min(CASE WHEN source = 'source_a' THEN price_numeric END) AS price_source_a,
min(CASE WHEN source = 'source_b' THEN price_numeric END) AS price_source_b,
min(CASE WHEN source = 'source_c' THEN price_numeric END) AS price_source_c,
min(CASE WHEN source = 'source_d' THEN price_numeric END) AS price_source_d,
min(CASE WHEN source = 'source_e' THEN price_numeric END) AS price_source_e,
min(price_numeric) AS best_price,
count(*) AS offers_count
FROM offers
WHERE ean_normalized IS NOT NULL
GROUP BY 1;Replace source_a..source_e with your actual source values. You can also add URL pivots, availability flags, and price spread calculations in the same query.
The final matching table will look like:
| ean | product_name | price_amazon | price_ebay | price_walmart | price_target | price_bestbuy | best_price | best_source | price_difference_pct | sources_count |
|---|---|---|---|---|---|---|---|---|---|---|
| 0194253817003 | Apple iPhone 15 Pro Max 256GB | 1299.99 | 1249.99 | 1299.00 | 1299.99 | 1295.00 | 1249.99 | ebay | 4.0 | 5 |
| 0194253817004 | Samsung Galaxy S24 Ultra 512GB | 1199.99 | 1195.00 | 1199.00 | 1199.99 | 1198.00 | 1195.00 | ebay | 0.4 | 5 |
| 0194253817005 | Sony WH-1000XM5 Headphones | 399.99 | 389.99 | 399.00 | 399.99 | 395.00 | 389.99 | ebay | 2.5 | 5 |
Define a canonical schema that all sources map to:
# Canonical product offer schema
fields:
- title: string # Product title
- price: decimal # Normalized price (same currency)
- price_raw: string # Original price string (for debugging)
- currency: string # Currency code (EUR, USD, etc.)
- url: string # Product page URL
- ean: string # EAN-13 barcode (if available)
- sku: string # SKU (if available)
- availability: string # "in_stock", "out_of_stock", "preorder"
- source: string # Source identifier (e.g., "amazon", "ebay")
- image_url: string # Product image URL
- description: string # Product description
- brand: string # Brand name
- category: string # Product categoryNormalize prices to a common currency and format:
# Python extension: normalize_price
def normalize_price(row):
import re
from decimal import Decimal
price_raw = row.get("price_raw", "")
currency = row.get("currency", "EUR")
# Extract numeric value
price_match = re.search(r'[\d,]+\.?\d*', price_raw.replace(',', ''))
if price_match:
price_value = Decimal(price_match.group())
# Convert to EUR if needed (simplified example)
if currency == "USD":
price_value = price_value * Decimal("0.92") # Example rate
currency = "EUR"
return {
**row,
"price": float(price_value),
"currency": currency,
"price_normalized": True
}
return {**row, "price": None, "price_normalized": False}Use EAN codes for accurate product matching across sources:
# EAN enrichment pipeline
pipeline:
- stage: load_csv
args:
- { path: "${INPUT_CSV_PATH}", header: "true", inferSchema: "true" }
- stage: searchEngine
args:
- provider: "google"
ean: "$EAN number"
num_results: 10
image_search: true
enrich: true
as: "search_json"
- stage: visit
args: [ "$result_link" ]
- stage: iextract
args:
- "Extract: product name, brand, price, EAN if visible, and all product image URLs"
- "prod_"
- stage: enrichMatchingScore
args: [ { input_ean_field: "EAN number", iextract_prefix: "prod_" } ]
- stage: save_csv
args: [ "${OUTPUT_PATH}", "overwrite" ]File: examples/pipelines/07-searchengine-ean-enrich.yaml
Choose deduplication keys based on data quality:
If EAN codes are available and reliable:
- stage: dedup
args: [ "ean" ] # Deduplicate by EANIf EAN is not available:
- stage: dedup
args: [ "url" ] # Deduplicate by URLCombine multiple keys for better accuracy:
- stage: dedup
args: [ "ean", "url", "source" ] # Deduplicate by EAN, then URL, then sourceAfter aggregation, analyze price differences:
-- Query via Trino (after pipeline execution)
SELECT
ean,
title,
MIN(price) as min_price,
MAX(price) as max_price,
AVG(price) as avg_price,
COUNT(*) as offer_count,
MAX(price) - MIN(price) as price_range
FROM stitched_offers
WHERE ean IS NOT NULL
GROUP BY ean, title
HAVING COUNT(*) > 1
ORDER BY price_range DESC
LIMIT 100;Always use environment variables for paths:
- stage: save_csv
args: [ "${OUTPUT_PATH}", "overwrite" ]Cache intermediate results before storing branches:
- stage: cache
args: []
- stage: store
args: [ "branch_label" ]Normalize prices and schemas as early as possible:
- stage: extract
args: [ ... ]
- stage: python_row_transform:normalize_price
args: []Validate EAN codes before using them for matching:
def validate_ean(ean):
if not ean or len(ean) not in [8, 13]:
return False
# Add checksum validation
return TrueHandle missing fields gracefully:
- stage: extract
args:
- { selector: "h1", method: "text", as: "title", default: "Unknown" }
- { selector: ".price_color", method: "text", as: "price_raw", default: "0.00" }- Vertical Use Cases: Overview of all vertical use cases
- Build a Pipeline: Learn pipeline fundamentals
- Pipeline Stages Reference: Complete stage reference
- EAN Image Sourcing Plugin: EAN-based product matching