API Integrations
Use without Airtable via API modeThe Airtable Lead Enricher can be used as a standalone API service without requiring an Airtable base. This enables integration with any data pipeline, workflow automation, or business intelligence tool.
API Mode Overview
| Feature | Value |
|---|---|
| Max companies/run | 100 |
| Input | JSON array |
| Output | Dataset + Webhook |
| Airtable required | No |
Python Example
Enrich leads using Python with both asynchronous and synchronous approaches:
Asynchronous (Recommended for Large Batches)
import requests
import time
APIFY_TOKEN = "your_token"
# Start run
run = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/runs",
params={"token": APIFY_TOKEN},
json={
"mode": "api",
"companies": [
{"companyName": "Acme Corp", "website": "https://acme.com"}
],
"enrichment": {
"sources": ["google_maps", "website", "hunter"],
"hunter": {
"enabled": True,
"apiKey": "YOUR_HUNTER_KEY"
}
},
"llm": {
"enabled": True,
"provider": "openai",
"apiKey": "YOUR_OPENAI_KEY"
}
}
).json()
# Wait for completion
run_id = run["data"]["id"]
while True:
status = requests.get(
f"https://api.apify.com/v2/actor-runs/{run_id}",
params={"token": APIFY_TOKEN}
).json()
if status["data"]["status"] in ["SUCCEEDED", "FAILED"]:
break
time.sleep(5)
# Get results
dataset_id = status["data"]["defaultDatasetId"]
results = requests.get(
f"https://api.apify.com/v2/datasets/{dataset_id}/items",
params={"token": APIFY_TOKEN}
).json()
print(results)
Synchronous (Wait for Results)
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": APIFY_TOKEN},
json={"mode": "api", "companies": [...]},
timeout=300
)
results = response.json()
Node.js Example
Enrich leads using the Apify JavaScript client:
const { ApifyClient } = require('apify-client');
const client = new ApifyClient({ token: 'your_token' });
async function enrichLeads(companies) {
const run = await client.actor('datahq/airtable-lead-enricher').call({
mode: 'api',
companies: companies,
enrichment: {
sources: ['google_maps', 'website', 'hunter'],
hunter: {
enabled: true,
apiKey: 'YOUR_HUNTER_KEY'
}
},
llm: {
enabled: true,
provider: 'openai',
apiKey: 'YOUR_OPENAI_KEY'
}
});
const { items } = await client.dataset(run.defaultDatasetId).listItems();
return items;
}
// Usage
const companies = [
{ companyName: 'Acme Corp', website: 'https://acme.com' }
];
enrichLeads(companies).then(results => {
console.log(results);
});
Output Schema
Each enriched lead returns the following JSON structure:
{
"companyName": "Acme Corp",
"website": "https://acme.com",
"location": "San Francisco, CA",
"email": "contact@acme.com",
"phone": "+1 555 0100",
"address": "123 Main St, San Francisco, CA",
"rating": 4.5,
"reviewCount": 127,
"linkedinUrl": "https://linkedin.com/company/acme",
"facebookUrl": "https://facebook.com/acme",
"twitterHandle": "@acme",
"industry": "Technology",
"leadScore": 85,
"icpScore": 36,
"icpReasoning": "Strong tech stack alignment...",
"summary": "Acme Corp is an enterprise...",
"dataConfidence": 0.87,
"enrichedAt": "2025-12-19T14:30:00Z",
"enrichmentSources": ["google_maps", "website", "hunter"]
}
Webhook Integration
Receive enrichment results via webhook for event-driven architectures:
Configure Webhook
{
"mode": "api",
"companies": [...],
"webhookUrl": "https://your-system.com/webhook"
}
Webhook Payload
When the enrichment completes, you'll receive:
{
"type": "RUN_COMPLETED",
"runId": "abc123",
"datasetId": "xyz789",
"timestamp": "2025-12-19T14:35:00Z",
"stats": {
"totalCompanies": 50,
"successful": 47,
"failed": 3,
"avgLeadScore": 72.4,
"avgDataConfidence": 0.81
},
"results": [...]
}
AWS Lambda Integration
Enrich leads from S3 using AWS Lambda functions:
import boto3
import requests
import json
import os
s3 = boto3.client('s3')
APIFY_TOKEN = os.environ['APIFY_TOKEN']
def lambda_handler(event, context):
# Read companies from S3
obj = s3.get_object(Bucket='my-bucket', Key='leads/pending.json')
companies = json.loads(obj['Body'].read())
# Enrich via Apify (max 100 per run)
batches = [companies[i:i+100] for i in range(0, len(companies), 100)]
all_results = []
for batch in batches:
run = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": APIFY_TOKEN},
json={
"mode": "api",
"companies": batch,
"enrichment": {"sources": ["google_maps", "website"]}
}
).json()
all_results.extend(run)
# Write enriched data back to S3
s3.put_object(
Bucket='my-bucket',
Key='leads/enriched.json',
Body=json.dumps(all_results)
)
return {"enriched": len(all_results)}
Apache Airflow DAG
Schedule and orchestrate lead enrichment with Apache Airflow:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
default_args = {
'owner': 'data-team',
'retries': 2
}
def enrich_leads(**context):
"""Call Apify actor to enrich leads."""
ti = context['ti']
leads = ti.xcom_pull(task_ids='extract_leads')
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": Variable.get("APIFY_TOKEN")},
json={
"mode": "api",
"companies": leads,
"enrichment": {"sources": ["google_maps", "website"]},
"llm": {
"enabled": True,
"provider": "openai",
"apiKey": Variable.get("OPENAI_API_KEY")
}
},
timeout=300
)
return response.json()
with DAG(
'lead_enrichment_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_leads',
python_callable=extract_leads
)
enrich = PythonOperator(
task_id='enrich_leads',
python_callable=enrich_leads
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> enrich >> load
AWS Glue Integration
Use AWS Glue ETL jobs to enrich data from your data catalog:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'APIFY_TOKEN'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Read leads from S3
datasource = glueContext.create_dynamic_frame.from_catalog(
database="leads_db",
table_name="raw_leads"
)
# Convert to list of companies
companies = []
for record in datasource.toDF().collect():
companies.append({
"companyName": record.company_name,
"website": record.website
})
# Enrich in batches (max 100 per run)
enriched_results = []
for i in range(0, len(companies), 100):
batch = companies[i:i+100]
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": args['APIFY_TOKEN']},
json={
"mode": "api",
"companies": batch,
"enrichment": {"sources": ["google_maps", "website"]}
},
timeout=300
)
enriched_results.extend(response.json())
# Write enriched data back to S3
df = spark.createDataFrame(enriched_results)
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df, glueContext, "enriched"),
connection_type="s3",
connection_options={"path": "s3://my-bucket/enriched-leads/"},
format="parquet"
)
job.commit()
Google Cloud Functions
Deploy serverless enrichment functions on Google Cloud Platform:
import functions_framework
from google.cloud import storage
import requests
import json
import os
@functions_framework.http
def enrich_leads(request):
"""HTTP Cloud Function to enrich leads from Cloud Storage."""
storage_client = storage.Client()
bucket = storage_client.bucket('my-leads-bucket')
# Read pending leads
blob = bucket.blob('leads/pending.json')
companies = json.loads(blob.download_as_text())
# Enrich via Apify
enriched = []
for i in range(0, len(companies), 100):
batch = companies[i:i+100]
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": os.environ['APIFY_TOKEN']},
json={
"mode": "api",
"companies": batch,
"enrichment": {"sources": ["google_maps", "website"]},
"llm": {
"enabled": True,
"provider": "openai",
"apiKey": os.environ['OPENAI_API_KEY']
}
},
timeout=300
)
enriched.extend(response.json())
# Write enriched data
output_blob = bucket.blob('leads/enriched.json')
output_blob.upload_from_string(
json.dumps(enriched, indent=2),
content_type='application/json'
)
return {
"success": True,
"enriched": len(enriched),
"output": "gs://my-leads-bucket/leads/enriched.json"
}
Deploy
gcloud functions deploy enrich-leads \
--runtime python39 \
--trigger-http \
--allow-unauthenticated \
--set-env-vars APIFY_TOKEN=your_token,OPENAI_API_KEY=your_key \
--timeout 540s \
--memory 512MB
AWS Step Functions (Durable Lambda)
For long-running enrichment jobs that exceed Lambda's 15-minute limit:
Step Functions State Machine
{
"Comment": "Lead Enrichment Pipeline",
"StartAt": "ReadLeads",
"States": {
"ReadLeads": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:read-leads-from-s3",
"Next": "EnrichLeads"
},
"EnrichLeads": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:enrich-via-apify",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 30,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Next": "WriteResults"
},
"WriteResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:write-to-s3",
"End": true
}
}
}
Enrich Lambda Function
import boto3
import requests
import os
def lambda_handler(event, context):
"""Long-running enrichment via Step Functions."""
companies = event['companies']
# Start async Apify run
run = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/runs",
params={"token": os.environ['APIFY_TOKEN']},
json={
"mode": "api",
"companies": companies,
"enrichment": {"sources": ["google_maps", "website", "hunter"]},
"llm": {"enabled": True, "provider": "openai", "apiKey": os.environ['OPENAI_KEY']}
}
).json()
run_id = run['data']['id']
# Poll for completion (Step Functions handles timeout)
while True:
status = requests.get(
f"https://api.apify.com/v2/actor-runs/{run_id}",
params={"token": os.environ['APIFY_TOKEN']}
).json()
if status['data']['status'] in ['SUCCEEDED', 'FAILED']:
break
# Get results
dataset_id = status['data']['defaultDatasetId']
results = requests.get(
f"https://api.apify.com/v2/datasets/{dataset_id}/items",
params={"token": os.environ['APIFY_TOKEN']}
).json()
return {"enrichedLeads": results, "count": len(results)}
Snowflake Integration
Enrich leads directly from Snowflake tables using Python stored procedures:
-- Create stored procedure
CREATE OR REPLACE PROCEDURE enrich_leads()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'requests')
HANDLER = 'enrich_leads_handler'
AS
$$
import requests
import json
def enrich_leads_handler(session):
# Read pending leads from table
leads_df = session.table("RAW_LEADS").filter("ENRICHED_AT IS NULL").limit(100)
companies = []
for row in leads_df.collect():
companies.append({
"companyName": row.COMPANY_NAME,
"website": row.WEBSITE
})
# Enrich via Apify
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": "YOUR_APIFY_TOKEN"},
json={
"mode": "api",
"companies": companies,
"enrichment": {"sources": ["google_maps", "website"]}
},
timeout=300
)
enriched = response.json()
# Insert enriched data
for lead in enriched:
session.sql(f"""
UPDATE RAW_LEADS
SET EMAIL = '{lead.get('email', '')}',
PHONE = '{lead.get('phone', '')}',
RATING = {lead.get('rating', 0)},
LEAD_SCORE = {lead.get('leadScore', 0)},
ENRICHED_AT = CURRENT_TIMESTAMP()
WHERE COMPANY_NAME = '{lead['companyName']}'
""").collect()
return f"Enriched {len(enriched)} leads"
$$;
-- Schedule with task
CREATE OR REPLACE TASK enrich_leads_daily
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 2 * * * UTC'
AS
CALL enrich_leads();
ALTER TASK enrich_leads_daily RESUME;
Amazon Redshift Integration
Enrich leads from Redshift using Lambda + S3 unload:
Step 1: Unload to S3
-- Unload pending leads to S3
UNLOAD (
'SELECT company_name, website
FROM leads
WHERE enriched_at IS NULL
LIMIT 1000'
)
TO 's3://my-bucket/leads/pending_'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftS3Role'
FORMAT AS JSON
PARALLEL OFF;
Step 2: Lambda Enrichment
import boto3
import requests
import json
import psycopg2
def lambda_handler(event, context):
s3 = boto3.client('s3')
# Read from S3
obj = s3.get_object(Bucket='my-bucket', Key='leads/pending_0000')
companies = [json.loads(line) for line in obj['Body'].read().decode().splitlines()]
# Enrich
response = requests.post(
"https://api.apify.com/v2/acts/datahq~airtable-lead-enricher/run-sync-get-dataset-items",
params={"token": os.environ['APIFY_TOKEN']},
json={"mode": "api", "companies": companies},
timeout=300
)
enriched = response.json()
# Write back to S3
s3.put_object(
Bucket='my-bucket',
Key='leads/enriched.json',
Body=json.dumps(enriched)
)
# Copy to Redshift
conn = psycopg2.connect(
host=os.environ['REDSHIFT_HOST'],
port=5439,
dbname='analytics',
user=os.environ['REDSHIFT_USER'],
password=os.environ['REDSHIFT_PASSWORD']
)
with conn.cursor() as cur:
cur.execute(f"""
COPY enriched_leads
FROM 's3://my-bucket/leads/enriched.json'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftS3Role'
FORMAT AS JSON 'auto';
""")
conn.commit()
return {"enriched": len(enriched)}
Data Lake Pattern (S3)
Write enriched data to partitioned data lakes for analytics:
# Write enriched data to partitioned data lake
from datetime import datetime
partition_path = f"s3://datalake/enriched-leads/year={datetime.now().year}/month={datetime.now().month:02d}/"
s3.put_object(
Bucket='datalake',
Key=f"{partition_path}leads_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet",
Body=df.to_parquet()
)
Rate Limits & Batching
Understanding limits and how to handle large-scale enrichment:
| Limit | Value |
|---|---|
| Companies/run | 100 |
| Concurrent runs | 10 |
| Run timeout | 1 hour |
Batch Processing (1000+ leads)
import asyncio
import aiohttp
async def enrich_all(companies, token):
"""Enrich all companies with controlled concurrency."""
batches = [companies[i:i+100] for i in range(0, len(companies), 100)]
async with aiohttp.ClientSession() as session:
tasks = [enrich_batch(session, batch, token) for batch in batches]
results = await asyncio.gather(*tasks)
return [item for batch_results in results for item in batch_results]
Cost Estimation
Estimated costs for different batch sizes:
| Companies | Apify Cost | LLM Cost | Total |
|---|---|---|---|
| 100 | $3.00 | $0.10 | $3.10 |
| 1,000 | $30.00 | $1.00 | $31.00 |
| 10,000 | $300.00 | $10.00 | $310.00 |