API Integrations

Use without Airtable via API mode

The Airtable Lead Enricher can be used as a standalone API service without requiring an Airtable base. This enables integration with any data pipeline, workflow automation, or business intelligence tool.

API Mode Overview

Feature Value
Max companies/run 100
Input JSON array
Output Dataset + Webhook
Airtable required No

Python Example

Enrich leads using Python with both asynchronous and synchronous approaches:

Asynchronous (Recommended for Large Batches)

import requests
import time

APIFY_TOKEN = "your_token"

# Start run
run = requests.post(
    "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/runs",
    params={"token": APIFY_TOKEN},
    json={
        "mode": "api",
        "companies": [
            {"companyName": "Acme Corp", "website": "https://acme.com"}
        ],
        "enrichment": {
            "sources": ["google_maps", "website", "hunter"],
            "hunter": {
                "enabled": True,
                "apiKey": "YOUR_HUNTER_KEY"
            }
        },
        "llm": {
            "enabled": True,
            "provider": "openai",
            "apiKey": "YOUR_OPENAI_KEY"
        }
    }
).json()

# Wait for completion
run_id = run["data"]["id"]

while True:
    status = requests.get(
        f"https://api.apify.com/v2/actor-runs/{run_id}",
        params={"token": APIFY_TOKEN}
    ).json()

    if status["data"]["status"] in ["SUCCEEDED", "FAILED"]:
        break
    time.sleep(5)

# Get results
dataset_id = status["data"]["defaultDatasetId"]
results = requests.get(
    f"https://api.apify.com/v2/datasets/{dataset_id}/items",
    params={"token": APIFY_TOKEN}
).json()

print(results)

Synchronous (Wait for Results)

response = requests.post(
    "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
    params={"token": APIFY_TOKEN},
    json={"mode": "api", "companies": [...]},
    timeout=300
)
results = response.json()

Node.js Example

Enrich leads using the Apify JavaScript client:

const { ApifyClient } = require('apify-client');
const client = new ApifyClient({ token: 'your_token' });

async function enrichLeads(companies) {
    const run = await client.actor('datahq/airtable-lead-enricher').call({
        mode: 'api',
        companies: companies,
        enrichment: {
            sources: ['google_maps', 'website', 'hunter'],
            hunter: {
                enabled: true,
                apiKey: 'YOUR_HUNTER_KEY'
            }
        },
        llm: {
            enabled: true,
            provider: 'openai',
            apiKey: 'YOUR_OPENAI_KEY'
        }
    });

    const { items } = await client.dataset(run.defaultDatasetId).listItems();
    return items;
}

// Usage
const companies = [
    { companyName: 'Acme Corp', website: 'https://acme.com' }
];

enrichLeads(companies).then(results => {
    console.log(results);
});

Output Schema

Each enriched lead returns the following JSON structure:

{
  "companyName": "Acme Corp",
  "website": "https://acme.com",
  "location": "San Francisco, CA",
  "email": "contact@acme.com",
  "phone": "+1 555 0100",
  "address": "123 Main St, San Francisco, CA",
  "rating": 4.5,
  "reviewCount": 127,
  "linkedinUrl": "https://linkedin.com/company/acme",
  "facebookUrl": "https://facebook.com/acme",
  "twitterHandle": "@acme",
  "industry": "Technology",
  "leadScore": 85,
  "icpScore": 36,
  "icpReasoning": "Strong tech stack alignment...",
  "summary": "Acme Corp is an enterprise...",
  "dataConfidence": 0.87,
  "enrichedAt": "2025-12-19T14:30:00Z",
  "enrichmentSources": ["google_maps", "website", "hunter"]
}

Webhook Integration

Receive enrichment results via webhook for event-driven architectures:

Configure Webhook

{
  "mode": "api",
  "companies": [...],
  "webhookUrl": "https://your-system.com/webhook"
}

Webhook Payload

When the enrichment completes, you'll receive:

{
  "type": "RUN_COMPLETED",
  "runId": "abc123",
  "datasetId": "xyz789",
  "timestamp": "2025-12-19T14:35:00Z",
  "stats": {
    "totalCompanies": 50,
    "successful": 47,
    "failed": 3,
    "avgLeadScore": 72.4,
    "avgDataConfidence": 0.81
  },
  "results": [...]
}

AWS Lambda Integration

Enrich leads from S3 using AWS Lambda functions:

import boto3
import requests
import json
import os

s3 = boto3.client('s3')
APIFY_TOKEN = os.environ['APIFY_TOKEN']

def lambda_handler(event, context):
    # Read companies from S3
    obj = s3.get_object(Bucket='my-bucket', Key='leads/pending.json')
    companies = json.loads(obj['Body'].read())

    # Enrich via Apify (max 100 per run)
    batches = [companies[i:i+100] for i in range(0, len(companies), 100)]
    all_results = []

    for batch in batches:
        run = requests.post(
            "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
            params={"token": APIFY_TOKEN},
            json={
                "mode": "api",
                "companies": batch,
                "enrichment": {"sources": ["google_maps", "website"]}
            }
        ).json()
        all_results.extend(run)

    # Write enriched data back to S3
    s3.put_object(
        Bucket='my-bucket',
        Key='leads/enriched.json',
        Body=json.dumps(all_results)
    )

    return {"enriched": len(all_results)}

Apache Airflow DAG

Schedule and orchestrate lead enrichment with Apache Airflow:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests

default_args = {
    'owner': 'data-team',
    'retries': 2
}

def enrich_leads(**context):
    """Call Apify actor to enrich leads."""
    ti = context['ti']
    leads = ti.xcom_pull(task_ids='extract_leads')

    response = requests.post(
        "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
        params={"token": Variable.get("APIFY_TOKEN")},
        json={
            "mode": "api",
            "companies": leads,
            "enrichment": {"sources": ["google_maps", "website"]},
            "llm": {
                "enabled": True,
                "provider": "openai",
                "apiKey": Variable.get("OPENAI_API_KEY")
            }
        },
        timeout=300
    )

    return response.json()

with DAG(
    'lead_enrichment_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:

    extract = PythonOperator(
        task_id='extract_leads',
        python_callable=extract_leads
    )

    enrich = PythonOperator(
        task_id='enrich_leads',
        python_callable=enrich_leads
    )

    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse
    )

    extract >> enrich >> load

AWS Glue Integration

Use AWS Glue ETL jobs to enrich data from your data catalog:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'APIFY_TOKEN'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read leads from S3
datasource = glueContext.create_dynamic_frame.from_catalog(
    database="leads_db",
    table_name="raw_leads"
)

# Convert to list of companies
companies = []
for record in datasource.toDF().collect():
    companies.append({
        "companyName": record.company_name,
        "website": record.website
    })

# Enrich in batches (max 100 per run)
enriched_results = []
for i in range(0, len(companies), 100):
    batch = companies[i:i+100]

    response = requests.post(
        "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
        params={"token": args['APIFY_TOKEN']},
        json={
            "mode": "api",
            "companies": batch,
            "enrichment": {"sources": ["google_maps", "website"]}
        },
        timeout=300
    )
    enriched_results.extend(response.json())

# Write enriched data back to S3
df = spark.createDataFrame(enriched_results)
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(df, glueContext, "enriched"),
    connection_type="s3",
    connection_options={"path": "s3://my-bucket/enriched-leads/"},
    format="parquet"
)

job.commit()
Tip: Use Glue job parameters to securely pass APIFY_TOKEN instead of hardcoding.

Google Cloud Functions

Deploy serverless enrichment functions on Google Cloud Platform:

import functions_framework
from google.cloud import storage
import requests
import json
import os

@functions_framework.http
def enrich_leads(request):
    """HTTP Cloud Function to enrich leads from Cloud Storage."""

    storage_client = storage.Client()
    bucket = storage_client.bucket('my-leads-bucket')

    # Read pending leads
    blob = bucket.blob('leads/pending.json')
    companies = json.loads(blob.download_as_text())

    # Enrich via Apify
    enriched = []
    for i in range(0, len(companies), 100):
        batch = companies[i:i+100]

        response = requests.post(
            "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
            params={"token": os.environ['APIFY_TOKEN']},
            json={
                "mode": "api",
                "companies": batch,
                "enrichment": {"sources": ["google_maps", "website"]},
                "llm": {
                    "enabled": True,
                    "provider": "openai",
                    "apiKey": os.environ['OPENAI_API_KEY']
                }
            },
            timeout=300
        )
        enriched.extend(response.json())

    # Write enriched data
    output_blob = bucket.blob('leads/enriched.json')
    output_blob.upload_from_string(
        json.dumps(enriched, indent=2),
        content_type='application/json'
    )

    return {
        "success": True,
        "enriched": len(enriched),
        "output": "gs://my-leads-bucket/leads/enriched.json"
    }

Deploy

gcloud functions deploy enrich-leads \
  --runtime python39 \
  --trigger-http \
  --allow-unauthenticated \
  --set-env-vars APIFY_TOKEN=your_token,OPENAI_API_KEY=your_key \
  --timeout 540s \
  --memory 512MB

AWS Step Functions (Durable Lambda)

For long-running enrichment jobs that exceed Lambda's 15-minute limit:

Step Functions State Machine

{
  "Comment": "Lead Enrichment Pipeline",
  "StartAt": "ReadLeads",
  "States": {
    "ReadLeads": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:read-leads-from-s3",
      "Next": "EnrichLeads"
    },
    "EnrichLeads": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:enrich-via-apify",
      "Retry": [
        {
          "ErrorEquals": ["States.TaskFailed"],
          "IntervalSeconds": 30,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Next": "WriteResults"
    },
    "WriteResults": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:write-to-s3",
      "End": true
    }
  }
}

Enrich Lambda Function

import boto3
import requests
import os

def lambda_handler(event, context):
    """Long-running enrichment via Step Functions."""
    companies = event['companies']

    # Start async Apify run
    run = requests.post(
        "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/runs",
        params={"token": os.environ['APIFY_TOKEN']},
        json={
            "mode": "api",
            "companies": companies,
            "enrichment": {"sources": ["google_maps", "website", "hunter"]},
            "llm": {"enabled": True, "provider": "openai", "apiKey": os.environ['OPENAI_KEY']}
        }
    ).json()

    run_id = run['data']['id']

    # Poll for completion (Step Functions handles timeout)
    while True:
        status = requests.get(
            f"https://api.apify.com/v2/actor-runs/{run_id}",
            params={"token": os.environ['APIFY_TOKEN']}
        ).json()

        if status['data']['status'] in ['SUCCEEDED', 'FAILED']:
            break

    # Get results
    dataset_id = status['data']['defaultDatasetId']
    results = requests.get(
        f"https://api.apify.com/v2/datasets/{dataset_id}/items",
        params={"token": os.environ['APIFY_TOKEN']}
    ).json()

    return {"enrichedLeads": results, "count": len(results)}

Snowflake Integration

Enrich leads directly from Snowflake tables using Python stored procedures:

-- Create stored procedure
CREATE OR REPLACE PROCEDURE enrich_leads()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'requests')
HANDLER = 'enrich_leads_handler'
AS
$$
import requests
import json

def enrich_leads_handler(session):
    # Read pending leads from table
    leads_df = session.table("RAW_LEADS").filter("ENRICHED_AT IS NULL").limit(100)
    companies = []

    for row in leads_df.collect():
        companies.append({
            "companyName": row.COMPANY_NAME,
            "website": row.WEBSITE
        })

    # Enrich via Apify
    response = requests.post(
        "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
        params={"token": "YOUR_APIFY_TOKEN"},
        json={
            "mode": "api",
            "companies": companies,
            "enrichment": {"sources": ["google_maps", "website"]}
        },
        timeout=300
    )

    enriched = response.json()

    # Insert enriched data
    for lead in enriched:
        session.sql(f"""
            UPDATE RAW_LEADS
            SET EMAIL = '{lead.get('email', '')}',
                PHONE = '{lead.get('phone', '')}',
                RATING = {lead.get('rating', 0)},
                LEAD_SCORE = {lead.get('leadScore', 0)},
                ENRICHED_AT = CURRENT_TIMESTAMP()
            WHERE COMPANY_NAME = '{lead['companyName']}'
        """).collect()

    return f"Enriched {len(enriched)} leads"
$$;

-- Schedule with task
CREATE OR REPLACE TASK enrich_leads_daily
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 0 2 * * * UTC'
AS
  CALL enrich_leads();

ALTER TASK enrich_leads_daily RESUME;

Amazon Redshift Integration

Enrich leads from Redshift using Lambda + S3 unload:

Step 1: Unload to S3

-- Unload pending leads to S3
UNLOAD (
  'SELECT company_name, website
   FROM leads
   WHERE enriched_at IS NULL
   LIMIT 1000'
)
TO 's3://my-bucket/leads/pending_'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftS3Role'
FORMAT AS JSON
PARALLEL OFF;

Step 2: Lambda Enrichment

import boto3
import requests
import json
import psycopg2

def lambda_handler(event, context):
    s3 = boto3.client('s3')

    # Read from S3
    obj = s3.get_object(Bucket='my-bucket', Key='leads/pending_0000')
    companies = [json.loads(line) for line in obj['Body'].read().decode().splitlines()]

    # Enrich
    response = requests.post(
        "https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
        params={"token": os.environ['APIFY_TOKEN']},
        json={"mode": "api", "companies": companies},
        timeout=300
    )
    enriched = response.json()

    # Write back to S3
    s3.put_object(
        Bucket='my-bucket',
        Key='leads/enriched.json',
        Body=json.dumps(enriched)
    )

    # Copy to Redshift
    conn = psycopg2.connect(
        host=os.environ['REDSHIFT_HOST'],
        port=5439,
        dbname='analytics',
        user=os.environ['REDSHIFT_USER'],
        password=os.environ['REDSHIFT_PASSWORD']
    )

    with conn.cursor() as cur:
        cur.execute(f"""
            COPY enriched_leads
            FROM 's3://my-bucket/leads/enriched.json'
            IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftS3Role'
            FORMAT AS JSON 'auto';
        """)
        conn.commit()

    return {"enriched": len(enriched)}

Data Lake Pattern (S3)

Write enriched data to partitioned data lakes for analytics:

# Write enriched data to partitioned data lake
from datetime import datetime

partition_path = f"s3://datalake/enriched-leads/year={datetime.now().year}/month={datetime.now().month:02d}/"

s3.put_object(
    Bucket='datalake',
    Key=f"{partition_path}leads_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet",
    Body=df.to_parquet()
)

Rate Limits & Batching

Understanding limits and how to handle large-scale enrichment:

Limit Value
Companies/run 100
Concurrent runs 10
Run timeout 1 hour

Batch Processing (1000+ leads)

import asyncio
import aiohttp

async def enrich_all(companies, token):
    """Enrich all companies with controlled concurrency."""
    batches = [companies[i:i+100] for i in range(0, len(companies), 100)]

    async with aiohttp.ClientSession() as session:
        tasks = [enrich_batch(session, batch, token) for batch in batches]
        results = await asyncio.gather(*tasks)

    return [item for batch_results in results for item in batch_results]

Cost Estimation

Estimated costs for different batch sizes:

Companies Apify Cost LLM Cost Total
100 $3.00 $0.10 $3.10
1,000 $30.00 $1.00 $31.00
10,000 $300.00 $10.00 $310.00
Note: LLM costs based on GPT-4o-mini (~$0.001/lead). Hunter.io costs separate if enabled.

DocumentationExtension GuideTermsPrivacy