Uber Eats Account Gen
Mar 25, 2025
UBER EATS ACCOUNT GEN
The code is split into two main files
- main.py - The entry point and coordination logic
- worker.py - The core implementation that handles the actual signup process
The system automates the UberEats account creation process with these key features
- Creates multiple accounts in parallel using threading
- Handles email verification codes
- Manages proxies to avoid rate limiting
- Implements browser fingerprinting to mimic real users
- Has extensive error handling and debugging
1. Initialization
- Sets up a session with browser fingerprinting
- Configures proxies if provided
- Generates random emails with the domain provided in the dotenv
2. Email Submission
- Submits the email to UberEats signup endpoint
- Handles any CAPTCHA challenges
- Extracts the authentication session ID from the response
3. Verification
- Checks for verification emails using IMAP
- Extracts the verification code
- Submits the code to complete registration
4. Completion
- Saves successful accounts to files
- Records statistics about success/failure rates
Technical Implementation
The code implements several advanced techniques
- Asynchronous programming with asyncio for performance
- Detailed session management with proper headers and cookies
- CSRF token extraction and handling
- Advanced error recovery mechanisms
- Extensive logging for debugging
Content exists: Yes
Raw content exists: No
Raw content length: 0
Raw content exists: No
Raw content length: 0
UBER EATS ACCOUNT GEN
The code is split into two main files
- main.py - The entry point and coordination logic
- worker.py - The core implementation that handles the actual signup process
The system automates the UberEats account creation process with these key features
- Creates multiple accounts in parallel using threading
- Handles email verification codes
- Manages proxies to avoid rate limiting
- Implements browser fingerprinting to mimic real users
- Has extensive error handling and debugging
1. Initialization
- Sets up a session with browser fingerprinting
- Configures proxies if provided
- Generates random emails with the domain provided in the dotenv
2. Email Submission
- Submits the email to UberEats signup endpoint
- Handles any CAPTCHA challenges
- Extracts the authentication session ID from the response
3. Verification
- Checks for verification emails using IMAP
- Extracts the verification code
- Submits the code to complete registration
4. Completion
- Saves successful accounts to files
- Records statistics about success/failure rates
Technical Implementation
The code implements several advanced techniques
- Asynchronous programming with asyncio for performance
- Detailed session management with proper headers and cookies
- CSRF token extraction and handling
- Advanced error recovery mechanisms
- Extensive logging for debugging
Worker.py Module
This module handles the core implementation of the account creation process
- Manages HTTP sessions with Uber's authentication servers
- Generates random emails with realistic patterns
- Handles verification code retrieval via IMAP
- Implements browser fingerprinting to bypass detection
1. Session Management
- Creates custom UberSession class for HTTP interaction
- Manages cookies, headers, and authentication tokens
- Extracts CSRF tokens from responses
- Maintains session state across multiple requests
2. Email Generation & Verification
- Creates randomized emails with varied patterns (firstname.lastname, username_123)
- Connects to IMAP servers to retrieve verification emails
- Uses regex patterns to extract verification codes
- Handles different email formats and providers
3. Request Handling
- Constructs proper JSON payloads for Uber's API endpoints
- Manages authentication session IDs across requests
- Solves CAPTCHAs using 2Captcha integration
- Implements retry mechanisms with exponential backoff
4. Debug & Recovery
- Saves detailed response data for debugging
- Includes comprehensive error handling
- Implements multiple verification attempts with different endpoints
- Provides detailed logging at multiple verbosity levels
Core Functions
The module exports several key functions:
- signup_flow() - The main entry point that coordinates the entire process
- load_proxies() - Loads and validates proxy configurations
- get_random_proxy() - Selects and formats a random proxy from the list
- fetch_verification_code_via_imap() - Retrieves verification codes from email
- solve_recaptcha_v2() - Handles CAPTCHA challenges via external API
import asyncio
import concurrent.futures
import os
import sys
import threading
import time
import traceback
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from worker import signup_flow, load_proxies, get_random_proxy
class UberEatsSignupManager:
def __init__(self, num_accounts, max_threads=10):
self.num_accounts = num_accounts
self.max_threads = min(max_threads, num_accounts)
self.successful_accounts = 0
self.failed_accounts = 0
self.lock = threading.Lock()
self.proxies = load_proxies()
os.makedirs("debug_data", exist_ok=True)
os.makedirs("accounts", exist_ok=True)
os.makedirs("logs", exist_ok=True)
async def run_signup(self, proxy=None):
try:
result = await signup_flow(email=None, proxy=proxy)
with self.lock:
if result:
self.successful_accounts += 1
else:
self.failed_accounts += 1
return result
except Exception as e:
with self.lock:
self.failed_accounts += 1
logging.error(f"Error in run_signup: {str(e)}")
return False
async def process_account(self):
proxy = get_random_proxy(self.proxies) if self.proxies else None
return await self.run_signup(proxy)
async def run(self):
tasks = set()
for _ in range(self.num_accounts):
if len(tasks) >= self.max_threads:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
task = asyncio.create_task(self.process_account())
tasks.add(task)
await asyncio.sleep(random.uniform(1.0, 3.0))
if tasks:
await asyncio.wait(tasks)
return self.successful_accounts, self.failed_accounts
import httpx
import random
import string
import json
import re
import time
import os
import logging
import asyncio
import imaplib
import email
import datetime
import base64
import traceback
import names
import uuid
import email.utils
from email.header import decode_header
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("ubereats_bot.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Constants
UBER_HOST = "www.ubereats.com"
AUTH_HOST = "auth.uber.com"
GEO_HOST = "cn-geo1.uber.com"
BASE_URL = f"https://{UBER_HOST}"
SUBMIT_FORM_URL = f"https://{AUTH_HOST}/v2/submit-form"
VERIFY_URL = f"https://{AUTH_HOST}/v3/silkid/verify"
SUBMIT_OTP_URL = f"https://{GEO_HOST}/rt/silk-screen/submit-form"
# Global variable to store the current session ID
CURRENT_SESSION_ID = None
# Content type constants
CONTENT_TYPE_JSON = "application/json"
ACCEPT_ALL = "*/*"
# Form field constants
EMAIL_OTP_CODE = "EMAIL_OTP_CODE"
EMAIL_ADDRESS = "EMAIL_ADDRESS"
FLOW_TYPE_INITIAL = "INITIAL"
FLOW_TYPE_SIGN_UP = "SIGN_UP"
# Configuration
CAPTCHA_API_KEY = os.getenv('CAPTCHA_API_KEY')
EMAIL_SETTINGS = {
"imap_server": os.getenv('EMAIL_IMAP_SERVER', 'imap.gmail.com'),
"imap_port": int(os.getenv('EMAIL_IMAP_PORT', '993')),
"email": os.getenv('EMAIL_USER', 'your_email@gmail.com'),
"password": os.getenv('EMAIL_APP_PASSWORD', 'your_app_password')
}
# Browser fingerprint configuration
FINGERPRINT_CONFIG = {
"platform": "Win32",
"appVersion": "5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"screen": {"width": 1920, "height": 1080},
"deviceMemory": 8,
"hardwareConcurrency": 4,
"cookieEnabled": True,
"language": "en-US",
"languages": ["en-US", "en"],
"timezone": "America/New_York",
"timezoneOffset": -240
}
# Create directories for logging and debug data
os.makedirs("debug_data", exist_ok=True)
os.makedirs("accounts", exist_ok=True)
os.makedirs("logs", exist_ok=True)
def generate_uuid():
"""Generate standard UUIDv4 identifier (36 characters)"""
# Use Python's built-in UUID module instead of our custom implementation
import uuid
return str(uuid.uuid4())
def generate_session_id():
"""Generate a session ID in the format Uber expects (two UUIDs with underscore)"""
import uuid
return f"{uuid.uuid4()}_{uuid.uuid4()}"
def generate_random_email():
"""Generate random email address with configurable domains and formats"""
# List of domains to use randomly - update this with your actual forwarding domains
domains = ['doggydascaryllc.com'] # Replace with your forwarding domains
# Generate username with various patterns to avoid detection
username_patterns = [
# Standard random lowercase
lambda: ''.join(random.choices(string.ascii_lowercase, k=random.randint(8, 12))),
# Firstname.lastname format
lambda: random.choice(['john', 'jane', 'alex', 'emma', 'mike', 'sarah', 'david', 'lisa']) +
'.' +
random.choice(['smith', 'doe', 'jones', 'wilson', 'brown', 'taylor', 'davis', 'clark']),
# Firstname with numbers
lambda: random.choice(['john', 'jane', 'alex', 'emma', 'mike', 'sarah', 'david', 'lisa']) +
''.join(random.choices(string.digits, k=random.randint(2, 4))),
# Random pattern with underscores
lambda: ''.join(random.choices(string.ascii_lowercase, k=random.randint(4, 7))) +
'_' +
''.join(random.choices(string.ascii_lowercase, k=random.randint(3, 6)))
]
# Choose a random pattern
username_generator = random.choice(username_patterns)
username = username_generator()
# Create the email address
domain = random.choice(domains)
email = f"{username}@{domain}"
logger.info(f"Generated email: {email}")
return email
async def connect_to_imap_server(imap_server, imap_port, imap_user, imap_password):
"""Connect to IMAP server with proper error handling"""
try:
# Connect to the IMAP server
logger.debug(f"Attempting IMAP connection to {imap_server}:{imap_port}")
mail = imaplib.IMAP4_SSL(imap_server, imap_port)
# Try to login - this is where most errors occur
logger.debug(f"Logging in as {imap_user}")
mail.login(imap_user, imap_password)
logger.info("IMAP login successful")
mail.select("INBOX")
return mail, None
except imaplib.IMAP4.error as e:
logger.error(f"IMAP error: {str(e)}")
logger.error("Check your IMAP credentials and settings in .env file")
logger.error("For Gmail, use an App Password instead of your regular password")
return None, str(e)
except Exception as e:
logger.error(f"Unexpected IMAP connection error: {str(e)}")
return None, str(e)
def safely_close_imap_connection(mail):
"""Safely close IMAP connection with error handling"""
if mail:
try:
mail.close()
mail.logout()
logger.debug("IMAP connection closed successfully")
except imaplib.IMAP4.error as e:
logger.warning(f"Error closing IMAP connection: {str(e)}")
except Exception as e:
logger.warning(f"Unexpected error closing IMAP connection: {str(e)}")
async def extract_email_body(msg):
"""Extract body from email message with error handling"""
body = ""
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip attachments
if "attachment" in content_disposition:
continue
if content_type == "text/plain" or content_type == "text/html":
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
body += payload.decode(charset, errors='replace')
except Exception as e:
logger.error(f"Error decoding part: {str(e)}")
continue
else:
try:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
except Exception as e:
logger.error(f"Error decoding message: {str(e)}")
except Exception as e:
logger.error(f"Error extracting email body: {str(e)}")
return body
async def find_verification_code(body, patterns):
"""Find verification code in email body using patterns"""
for pattern in patterns:
matches = re.findall(pattern, body, re.IGNORECASE)
if matches:
# Take the first match
code = matches[0]
logger.info(f"Found verification code: {code} using pattern {pattern}")
return code
return None
async def check_inbox_for_code(mail, email_address, patterns):
"""Search through inbox messages for verification code"""
try:
# Get the timestamp for recent emails (last 10 minutes)
ten_min_ago = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d-%b-%Y")
# Search for recent emails
logger.debug(f"Searching for emails since {ten_min_ago}")
status, data = mail.search(None, f'(SINCE "{ten_min_ago}")')
if status != 'OK':
logger.warning(f"Search failed with status: {status}")
# Try a broader search
status, data = mail.search(None, 'ALL')
if status == 'OK' and data[0]:
email_ids = data[0].split()
# Start with the most recent emails
logger.info(f"Found {len(email_ids)} emails to check")
for email_id in reversed(email_ids):
status, data = mail.fetch(email_id, '(RFC822)')
if status == 'OK':
raw_email = data[0][1]
# Parse the email
msg = email.message_from_bytes(raw_email)
# Log email subject for debugging
subject = msg.get("Subject", "")
from_addr = msg.get("From", "")
to_addr = msg.get("To", "")
if isinstance(subject, bytes):
subject = subject.decode()
logger.info(f"Checking email: Subject='{subject}' From='{from_addr}' To='{to_addr}'")
# Check if the email is addressed to our target email address
# This is critical to ensure we use the correct verification code
if email_address.lower() not in to_addr.lower():
logger.info(f"Skipping email not addressed to {email_address}")
continue
# Check if it's potentially an Uber email
if "uber" in from_addr.lower() or "verification" in subject.lower() or "code" in subject.lower():
logger.info("Found potential Uber verification email for current account")
# Extract the email body
body = await extract_email_body(msg)
# Save the email body for inspection
email_filename = f"debug_data/email_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with open(email_filename, "w", encoding="utf-8") as f:
f.write(body)
logger.info(f"Saved email body to {email_filename}")
# Look for verification code in the message content
code = await find_verification_code(body, patterns)
if code:
logger.info(f"Found verification code {code} for {email_address}")
return code
except Exception as e:
logger.error(f"Error checking inbox: {str(e)}")
return None
async def fetch_verification_code_via_imap(email_address, timeout=180):
"""
Fetch verification code from email using IMAP with enhanced error handling
Args:
email_address: The full email address
timeout: How long to wait for the email (in seconds)
Returns:
The verification code if found, or None if not found
"""
logger.info(f"Checking IMAP for verification code for {email_address}")
# Extract email credentials from settings
imap_server = EMAIL_SETTINGS["imap_server"]
imap_port = EMAIL_SETTINGS["imap_port"]
imap_user = EMAIL_SETTINGS["email"]
imap_password = EMAIL_SETTINGS["password"]
# Display credentials with masked password
masked_password = "*" * len(imap_password) if imap_password else "not set"
logger.info(f"IMAP Config: Server={imap_server}, Port={imap_port}, User={imap_user}, Password={masked_password}")
# Validate credentials are not empty
if not imap_user or not imap_password:
logger.error("IMAP credentials not configured properly. Check your .env file.")
return None
start_time = time.time()
# Common verification code patterns
patterns = [
r'verification code[^0-9]*(d{4})', # Standard "verification code: 1234" format
r'code[^0-9]*(d{4})', # Simple "code: 1234" format
r'(d{4})', # HTML format with strong tags
r'>(d{4})<', # HTML format with angle brackets
r'(d{4}) is your', # "1234 is your" format
r'your code is (d{4})', # "your code is 1234" format
r'(d{4})' # Any 4-digit number as fallback
]
while time.time() - start_time < timeout:
# Connect to IMAP
mail, error = await connect_to_imap_server(imap_server, imap_port, imap_user, imap_password)
if mail:
try:
# First try a targeted search for the specific email address
logger.info(f"Searching for emails addressed to {email_address}")
specific_search = f'(TO "{email_address}")'
try:
status, data = mail.search(None, specific_search)
if status == 'OK' and data[0]:
logger.info(f"Found {len(data[0].split())} emails specifically for {email_address}")
# Check these emails first
code = await check_inbox_for_code(mail, email_address, patterns)
if code:
safely_close_imap_connection(mail)
return code
except Exception as e:
logger.error(f"Error during specific email search: {str(e)}")
# If no code found with targeted search, try a general search
logger.info("Trying general inbox search")
mail.select("INBOX")
code = await check_inbox_for_code(mail, email_address, patterns)
if code:
safely_close_imap_connection(mail)
return code
# If no code found, wait a bit and try again
logger.info(f"No verification code found for {email_address} yet, waiting to check again...")
# Close the connection properly
safely_close_imap_connection(mail)
# Wait before trying again
await asyncio.sleep(10)
except Exception as e:
logger.error(f"Error checking email: {str(e)}")
# If we had a connection issue, wait a bit before retrying
safely_close_imap_connection(mail)
await asyncio.sleep(10)
else:
# If connection failed, wait longer before retrying
logger.error(f"IMAP connection failed: {error}")
await asyncio.sleep(20)
logger.error(f"No verification code found after {timeout} seconds")
return None
class UberSession:
def __init__(self):
self.client = None
self.csrf_token = None
self.auth_session_id = None
self.fingerprint = {}
self.proxy = None
self.last_response = None
self.form_data = None
def save_last_response(self, response):
"""Save the last response data for debugging and session management"""
self.last_response = response
# Try to extract and save additional session data
try:
if response.headers.get('content-type', '').startswith(CONTENT_TYPE_JSON):
data = response.json()
# Save full response data for debugging
os.makedirs("debug_data", exist_ok=True)
with open("debug_data/last_response.json", "w") as f:
json.dump(data, f, indent=2)
if 'formContainerAnswer' in data:
# Save all form data for later use
self.form_data = data['formContainerAnswer']
logger.info("Successfully saved form data from response")
else:
logger.warning("Response JSON doesn't contain formContainerAnswer")
# Try saving the whole response as form data
self.form_data = data
logger.info("Saved entire response as form_data")
# Save any session ID if available
if 'formContainerAnswer' in data and 'inAuthSessionID' in data['formContainerAnswer']:
self.auth_session_id = data['formContainerAnswer']['inAuthSessionID']
logger.info(f"Saved auth session ID: {self.auth_session_id}")
# If we still don't have the session ID, try to extract it from the raw response
if not self.auth_session_id:
try:
session_id_match = re.search(r'"inAuthSessionID"s*:s*"([^"]+)"', response.text)
if session_id_match:
self.auth_session_id = session_id_match.group(1)
logger.info(f"Extracted auth session ID from raw response: {self.auth_session_id}")
except Exception as e:
logger.error(f"Error extracting session ID from raw response: {str(e)}")
# Check if we got a session ID in a cookie
if not self.auth_session_id and 'set-cookie' in response.headers:
try:
cookies_str = response.headers['set-cookie']
session_cookie_match = re.search(r'auth_session=([^;]+)', cookies_str)
if session_cookie_match:
self.auth_session_id = session_cookie_match.group(1)
logger.info(f"Extracted auth session ID from cookie: {self.auth_session_id}")
except Exception as e:
logger.error(f"Error extracting session ID from cookie: {str(e)}")
except Exception as e:
logger.error(f"Error saving response data: {e}")
logger.error(traceback.format_exc())
async def __aenter__(self):
self.client = httpx.AsyncClient(
timeout=30,
follow_redirects=True,
headers=self.generate_headers(),
http2=True
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Properly close the client
if self.client is not None:
await self.client.aclose()
def generate_headers(self):
"""Generate realistic browser headers with fingerprint"""
return {
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-Ch-Ua": ""Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"",
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": ""Windows"",
"x-csrf-token": "x",
"DNT": "1"
}
async def initialize(self, proxy=None):
try:
# Store proxy reference
self.proxy = proxy
# Configure proxy if provided
if self.proxy:
self.client.proxies = {"all://": self.proxy}
logger.info(f"Using proxy: {self.proxy.split('@')[-1] if '@' in self.proxy else self.proxy}")
# Generate browser fingerprint with safe proxy checks
network_ip = "127.0.0.1"
network_type = "broadband"
if self.proxy:
try:
network_ip = self.proxy.split('@')[-1].split(':')[0]
network_type = "mobile" if "mobile" in self.proxy else "broadband"
except Exception as e:
logger.warning(f"Proxy parsing error: {str(e)}")
self.fingerprint = {
"device_id": generate_uuid(),
"client_data": base64.b64encode(json.dumps(FINGERPRINT_CONFIG).encode()).decode(),
"screen_resolution": f"{FINGERPRINT_CONFIG['screen']['width']}x{FINGERPRINT_CONFIG['screen']['height']}",
"network": {
"ip": network_ip,
"type": network_type
}
}
# Make initialization request
headers = self.generate_headers()
headers["Content-Type"] = CONTENT_TYPE_JSON
headers["Origin"] = f"https://{AUTH_HOST}"
headers["Referer"] = f"https://{AUTH_HOST}/v2/?breeze_init_req_id={generate_uuid()}&breeze_local_zone=dca18&next_url=https%3A%2F%2Fwww.ubereats.com%2Flogin-redirect%2F"
headers["x-uber-analytics-session-id"] = generate_uuid()
headers["x-uber-client-name"] = "usl_desktop"
headers["x-uber-did"] = generate_uuid()
headers["x-uber-marketing-id"] = generate_uuid()
headers["x-uber-request-uuid"] = generate_uuid()
headers["x-uber-usl-id"] = generate_uuid()
# Create cookie header
cookies = {
"marketing_vistor_id": generate_uuid(),
"_ua": json.dumps({"session_id": generate_uuid(), "session_time_ms": int(time.time()*1000)}),
"usl.territory": "5.9LsYfEeVsxdKSYQzdEv9h30Qjqj+c1l5lhY04XkeLC8="
}
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
headers["Cookie"] = cookie_str
# Create the formContainerAnswer payload based on the actual request data
payload = {
"formContainerAnswer": {
"inAuthSessionID": "",
"formAnswer": {
"flowType": FLOW_TYPE_INITIAL,
"standardFlow": True,
"accountManagementFlow": False,
"daffFlow": False,
"productConstraints": {
"isEligibleForWebOTPAutofill": False,
"uslFELibVersion": "",
"uslMobileLibVersion": "",
"isWhatsAppAvailable": False,
"isPublicKeyCredentialSupported": True,
"isFacebookAvailable": False,
"isRakutenAvailable": False,
"isKakaoAvailable": False
},
"additionalParams": {
"isEmailUpdatePostAuth": False
},
"deviceData": self.fingerprint["device_id"],
"nextURL": f"https://{UBER_HOST}/login-redirect/",
"sessionReferer": f"https://{UBER_HOST}/",
"uslURL": f"https://{AUTH_HOST}/v2/",
"screenAnswers": [],
"appContext": {
"stateToken": ""
}
}
}
}
# Add a delay to mimic human behavior
await asyncio.sleep(random.uniform(1.5, 3.0))
response = await self.client.post(
SUBMIT_FORM_URL,
json=payload,
headers=headers
)
# Log response for debugging
logger.info(f"HTTP Request: POST {SUBMIT_FORM_URL} "{response.status_code} {response.reason_phrase}"")
if response.status_code == 403:
raise Exception("Blocked by Uber security systems")
self.csrf_token = self.extract_csrf_token(response)
return True
except Exception as e:
logger.error(f"Initialization failed: {str(e)}")
return False
def extract_csrf_token(self, response):
"""Advanced CSRF token extraction with multiple fallbacks"""
try:
# Check modern JSON responses first
if response.headers.get('content-type', '').startswith(CONTENT_TYPE_JSON):
try:
data = response.json()
token = data.get('security', {}).get('csrfToken')
if token:
return token
except Exception as e:
logger.error(f"Error parsing JSON response: {str(e)}")
# Check HTML meta tags
try:
soup = BeautifulSoup(response.text, 'html.parser')
meta_token = soup.find('meta', {'name': 'uber-csrf-token'})
if meta_token:
return meta_token.get('content')
except Exception as e:
logger.error(f"Error parsing HTML for meta tags: {str(e)}")
# Check Next.js data
try:
soup = BeautifulSoup(response.text, 'html.parser')
script_data = soup.find('script', id='__NEXT_DATA__')
if script_data and script_data.string:
data = json.loads(script_data.string)
return data.get('props', {}).get('pageProps', {}).get('csrfToken')
except Exception as e:
logger.error(f"Error parsing Next.js data: {str(e)}")
# Final fallback to headers
return response.headers.get('x-csrf-token')
except Exception as e:
logger.error(f"CSRF extraction error: {str(e)}")
return None
async def solve_recaptcha_v2(site_key, page_url):
"""Solve reCAPTCHA v2 with 2Captcha"""
async with httpx.AsyncClient() as client:
# Create task
data = {
"key": CAPTCHA_API_KEY,
"method": "userrecaptcha",
"googlekey": site_key,
"pageurl": page_url,
"json": 1,
"invisible": 1
}
response = await client.post("https://2captcha.com/in.php", data=data)
task_id = response.json().get('request')
# Retrieve solution
for _ in range(30):
await asyncio.sleep(5)
response = await client.get(
"https://2captcha.com/res.php",
params={"key": CAPTCHA_API_KEY, "action": "get", "id": task_id, "json": 1}
)
if response.json().get('status') == 1:
return response.json().get('request')
return None
async def create_verification_payload(session, code):
"""Create verification payload based on session data"""
# Check for a valid session ID first
if not session.auth_session_id:
logger.warning("No auth_session_id available for verification payload")
# First try to find it in debug files
try:
if os.path.exists("debug_data/email_submission_response.json"):
with open("debug_data/email_submission_response.json", "r") as f:
response_data = json.load(f)
if 'formContainerAnswer' in response_data and 'inAuthSessionID' in response_data['formContainerAnswer']:
session.auth_session_id = response_data['formContainerAnswer']['inAuthSessionID']
logger.info(f"Recovered auth_session_id from saved file: {session.auth_session_id}")
# Check all raw response files for session IDs
if not session.auth_session_id:
for filename in os.listdir("debug_data"):
if filename.startswith("email_submission_full_response_"):
try:
with open(f"debug_data/{filename}", "r", encoding="utf-8") as f:
content = f.read()
session_id_match = re.search(r'"inAuthSessionID"s*:s*"([^"]+)"', content)
if session_id_match:
session.auth_session_id = session_id_match.group(1)
logger.info(f"Found auth_session_id in {filename}: {session.auth_session_id}")
break
except Exception as e:
logger.error(f"Error searching file {filename}: {str(e)}")
except Exception as e:
logger.error(f"Error recovering session ID from files: {str(e)}")
sample_payload = None
try:
if os.path.exists("debug_data/successful_payload.json"):
with open("debug_data/successful_payload.json", "r") as f:
sample_payload = json.load(f)
logger.info("Loaded successful payload template")
else:
# Create a sample successful payload based on your provided example
sample_payload = {
"formContainerAnswer": {
"inAuthSessionID": session.auth_session_id or "",
"formAnswer": {
"flowType": "SIGN_UP",
"standardFlow": True,
"accountManagementFlow": False,
"daffFlow": False,
"productConstraints": {
"isEligibleForWebOTPAutofill": False,
"uslFELibVersion": "",
"uslMobileLibVersion": "",
"isWhatsAppAvailable": False,
"isPublicKeyCredentialSupported": True,
"isFacebookAvailable": False,
"isRakutenAvailable": False,
"isKakaoAvailable": False
},
"additionalParams": {
"isEmailUpdatePostAuth": False
},
"deviceData": session.fingerprint.get("device_id", ""),
"nextURL": f"https://{UBER_HOST}/login-redirect/",
"sessionReferer": f"https://{UBER_HOST}/",
"uslURL": f"https://{AUTH_HOST}/v2/",
"screenAnswers": [
{
"screenType": EMAIL_OTP_CODE,
"eventType": "TypeEmailOTP",
"fieldAnswers": [
{
"fieldType": EMAIL_OTP_CODE,
"emailOTPCode": code
}
]
}
]
}
}
}
# Save this for future use
os.makedirs("debug_data", exist_ok=True)
with open("debug_data/successful_payload.json", "w") as f:
json.dump(sample_payload, f, indent=2)
except Exception as e:
logger.error(f"Error with sample payload: {str(e)}")
logger.error(traceback.format_exc())
# If we don't have a valid session ID, generate one only as a last resort
if not session.auth_session_id:
logger.warning("Still no auth_session_id found, may need to debug the email submission response")
# Proceed with creating the payload
if hasattr(session, 'form_data') and session.form_data:
# Use data from initial response to construct the payload
payload = {
"formContainerAnswer": {
"inAuthSessionID": session.auth_session_id or "",
"formAnswer": {
# Preserve ALL fields from the original response
"flowType": session.form_data.get('formAnswer', {}).get('flowType', FLOW_TYPE_SIGN_UP),
"standardFlow": session.form_data.get('formAnswer', {}).get('standardFlow', True),
"accountManagementFlow": session.form_data.get('formAnswer', {}).get('accountManagementFlow', False),
"daffFlow": session.form_data.get('formAnswer', {}).get('daffFlow', False),
"productConstraints": session.form_data.get('formAnswer', {}).get('productConstraints', {}),
"additionalParams": session.form_data.get('formAnswer', {}).get('additionalParams', {}),
"deviceData": session.form_data.get('formAnswer', {}).get('deviceData', session.fingerprint.get("device_id", "")),
"nextURL": session.form_data.get('formAnswer', {}).get('nextURL', ""),
"sessionReferer": session.form_data.get('formAnswer', {}).get('sessionReferer', ""),
"uslURL": session.form_data.get('formAnswer', {}).get('uslURL', ""),
# Add the verification screen answer with the REAL code
"screenAnswers": [
{
"screenType": EMAIL_OTP_CODE,
"eventType": "TypeEmailOTP",
"fieldAnswers": [
{
"fieldType": EMAIL_OTP_CODE,
"emailOTPCode": code
}
]
}
]
}
}
}
elif sample_payload:
# Use the sample payload we know works
payload = sample_payload
# Update the session ID and code
payload["formContainerAnswer"]["inAuthSessionID"] = session.auth_session_id or ""
payload["formContainerAnswer"]["formAnswer"]["screenAnswers"][0]["fieldAnswers"][0]["emailOTPCode"] = code
payload["formContainerAnswer"]["formAnswer"]["deviceData"] = session.fingerprint.get("device_id", "")
logger.info("Using sample successful payload template with current session data")
else:
# Last resort fallback
logger.warning("No form data or sample template, using basic fallback payload")
payload = {
"formContainerAnswer": {
"inAuthSessionID": session.auth_session_id or "",
"formAnswer": {
"flowType": FLOW_TYPE_SIGN_UP,
"standardFlow": True,
"accountManagementFlow": False,
"daffFlow": False,
"deviceData": session.fingerprint.get("device_id", ""),
"nextURL": f"https://{UBER_HOST}/login-redirect/",
"sessionReferer": f"https://{UBER_HOST}/",
"uslURL": f"https://{AUTH_HOST}/v2/",
"screenAnswers": [
{
"screenType": EMAIL_OTP_CODE,
"eventType": "TypeEmailOTP",
"fieldAnswers": [
{
"fieldType": EMAIL_OTP_CODE,
"emailOTPCode": code
}
]
}
]
}
}
}
return payload
async def submit_verification_code(session, payload, url, attempt_num):
"""Submit verification code to specified URL"""
try:
# Set up headers
headers = session.generate_headers()
headers["X-Csrf-Token"] = session.csrf_token or "x"
headers["Content-Type"] = CONTENT_TYPE_JSON
headers["Accept"] = ACCEPT_ALL
headers["X-Uber-Client-Name"] = "usl_desktop"
headers["Origin"] = f"https://{AUTH_HOST}"
headers["Referer"] = f"https://{AUTH_HOST}/v2/?breeze_init_req_id={generate_uuid()}&breeze_local_zone=dca18&next_url=https%3A%2F%2Fwww.ubereats.com%2Flogin-redirect%2F"
headers["x-uber-analytics-session-id"] = generate_uuid()
headers["x-uber-did"] = generate_uuid()
headers["x-uber-marketing-id"] = generate_uuid()
headers["x-uber-request-uuid"] = generate_uuid()
headers["x-uber-usl-id"] = generate_uuid()
# Create cookie header - this can help with authentication
cookies = {
"marketing_vistor_id": generate_uuid(),
"_ua": json.dumps({"session_id": generate_uuid(), "session_time_ms": int(time.time()*1000)}),
"usl.territory": "5.9LsYfEeVsxdKSYQzdEv9h30Qjqj+c1l5lhY04XkeLC8="
}
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
headers["Cookie"] = cookie_str
# Log what we're sending
logger.info(f"Submitting verification code to {url}")
logger.info(f"Session ID: {payload.get('formContainerAnswer', {}).get('inAuthSessionID', 'None')}")
# Submit the code
response = await session.client.post(
url,
json=payload,
headers=headers
)
# Log the response status
logger.info(f"HTTP Request: POST {url} "{response.status_code} {response.reason_phrase}"")
# Save response for debugging
debug_path = f"debug_data/verification_response_{url.split('/')[-2]}_{attempt_num}.json"
with open(debug_path, "w") as f:
f.write(response.text)
# Save a copy of the successful payload if this works
if response.status_code == 200:
with open("debug_data/successful_payload.json", "w") as f:
json.dump(payload, f, indent=2)
logger.info(f"Saved successful payload for future use")
return response
except Exception as e:
logger.error(f"Error submitting verification code: {str(e)}")
logger.error(traceback.format_exc())
return None
async def process_verification(session, email, session_id=None):
"""Handle verification with proper email code retrieval"""
logger.info(f"Starting verification process for email: {email}")
# Use session ID parameter if provided
if session_id and not session.auth_session_id:
logger.info(f"Setting session ID from parameter: {session_id}")
session.auth_session_id = session_id
# Check global session ID if still missing
global CURRENT_SESSION_ID
if not session.auth_session_id and CURRENT_SESSION_ID:
logger.info(f"Using global session ID: {CURRENT_SESSION_ID}")
session.auth_session_id = CURRENT_SESSION_ID
# Try to load from file as last resort
if not session.auth_session_id:
try:
if os.path.exists("debug_data/current_session_id.txt"):
with open("debug_data/current_session_id.txt", "r") as f:
file_session_id = f.read().strip()
if file_session_id:
logger.info(f"Loaded session ID from file: {file_session_id}")
session.auth_session_id = file_session_id
except Exception as e:
logger.error(f"Error loading session ID from file: {str(e)}")
# Debug the session state before verification
logger.info(f"Session state before verification: auth_session_id={session.auth_session_id or 'None'}")
logger.info(f"Has form_data: {hasattr(session, 'form_data') and session.form_data is not None}")
# Attempt to retrieve the actual verification code from email
code = await fetch_verification_code_via_imap(email, timeout=90)
if not code:
logger.error("Failed to retrieve verification code from email")
return False
logger.info(f"Successfully retrieved verification code: {code}")
# Save the verification code so user can manually try it if needed
with open(f"debug_data/verification_code_{email.split('@')[0]}.txt", "w") as f:
f.write(f"Email: {email}
Verification Code: {code}
Timestamp: {datetime.datetime.now()}")
# Check for session ID in raw response files if still missing
if not session.auth_session_id:
for filename in os.listdir("debug_data"):
if filename.startswith("email_submission_full_response_"):
try:
with open(f"debug_data/{filename}", "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r'"inAuthSessionID"s*:s*"([^"]+)"', content)
if match:
session.auth_session_id = match.group(1)
logger.info(f"Found session ID in file {filename}: {session.auth_session_id}")
break
except Exception as e:
logger.error(f"Error reading file {filename}: {str(e)}")
# Absolutely DO NOT generate a new session ID here
if not session.auth_session_id:
logger.error("Cannot proceed without a valid session ID from the server")
logger.error("Please check the email submission response for a valid session ID")
return False
# Create verification payload
payload = await create_verification_payload(session, code)
# Log and save the payload for debugging
logger.info(f"Using actual verification code: {code}")
os.makedirs("debug_data", exist_ok=True)
with open("debug_data/verification_payload.json", "w") as f:
json.dump(payload, f, indent=2)
# Try GEO URL first since we know it works from the successful example
for attempt in range(3):
# Try the geo URL first this time since we have a successful example from it
response = await submit_verification_code(
session, payload, SUBMIT_OTP_URL, attempt+1
)
if response and response.status_code == 200:
logger.info("Verification successful with geo URL!")
return True
if response:
logger.error(f"Geo URL verification failed: {response.text}")
# If we got a response, try to learn from it
try:
error_data = response.json()
# Check for specific error messages
if "screenErrors" in error_data:
for screen_error in error_data["screenErrors"]:
if "errors" in screen_error and EMAIL_OTP_CODE in screen_error["errors"]:
error_info = screen_error["errors"][EMAIL_OTP_CODE]
error_type = error_info.get("errorType")
error_message = error_info.get("message")
logger.error(f"Error type: {error_type}, Message: {error_message}")
except Exception as e:
logger.error(f"Error analyzing response: {str(e)}")
# Try the main auth URL as fallback
response = await submit_verification_code(
session, payload, SUBMIT_FORM_URL, attempt+1
)
if response and response.status_code == 200:
logger.info("Verification successful with main URL!")
return True
if response:
logger.error(f"Main URL verification failed: {response.text}")
# Wait before retry
await asyncio.sleep(5)
logger.error("All verification attempts failed")
return False
async def create_email_submission_payload(session, email):
"""Create payload for initial email submission"""
return {
"formContainerAnswer": {
"inAuthSessionID": "",
"formAnswer": {
"flowType": FLOW_TYPE_INITIAL,
"standardFlow": True,
"accountManagementFlow": False,
"daffFlow": False,
"productConstraints": {
"isEligibleForWebOTPAutofill": False,
"uslFELibVersion": "",
"uslMobileLibVersion": "",
"isWhatsAppAvailable": False,
"isPublicKeyCredentialSupported": True,
"isFacebookAvailable": False,
"isRakutenAvailable": False,
"isKakaoAvailable": False
},
"additionalParams": {
"isEmailUpdatePostAuth": False
},
"deviceData": session.fingerprint["device_id"],
"nextURL": f"https://{UBER_HOST}/login-redirect/",
"sessionReferer": f"https://{UBER_HOST}/",
"uslURL": f"https://{AUTH_HOST}/v2/",
"screenAnswers": [
{
"screenType": "PHONE_NUMBER_INITIAL",
"eventType": "TypeInputEmail",
"fieldAnswers": [
{
"fieldType": EMAIL_ADDRESS,
"emailAddress": email
}
]
}
],
"appContext": {
"stateToken": ""
}
}
}
}
async def submit_email(session, email):
"""Submit initial email for signup"""
try:
# Prepare headers
headers = session.generate_headers()
headers["X-Csrf-Token"] = session.csrf_token or "x"
headers["Content-Type"] = CONTENT_TYPE_JSON
headers["Origin"] = f"https://{AUTH_HOST}"
headers["Referer"] = f"https://{AUTH_HOST}/v2/?breeze_init_req_id={generate_uuid()}&breeze_local_zone=dca18&next_url=https%3A%2F%2Fwww.ubereats.com%2Flogin-redirect%2F"
headers["Accept"] = ACCEPT_ALL
headers["X-Uber-Challenge-Provider"] = "ARKOSE_TOKEN"
headers["X-Uber-Client-Name"] = "usl_desktop"
headers["x-uber-analytics-session-id"] = generate_uuid()
headers["x-uber-did"] = generate_uuid()
headers["x-uber-marketing-id"] = generate_uuid()
headers["x-uber-request-uuid"] = generate_uuid()
headers["x-uber-usl-id"] = generate_uuid()
# Create cookie header
cookies = {
"marketing_vistor_id": generate_uuid(),
"_ua": json.dumps({"session_id": generate_uuid(), "session_time_ms": int(time.time()*1000)}),
"usl.territory": "5.9LsYfEeVsxdKSYQzdEv9h30Qjqj+c1l5lhY04XkeLC8="
}
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
headers["Cookie"] = cookie_str
# Create the payload
payload = await create_email_submission_payload(session, email)
# Save the payload for debugging
os.makedirs("debug_data", exist_ok=True)
with open("debug_data/email_submission_payload.json", "w") as f:
json.dump(payload, f, indent=2)
# Add a delay to mimic human behavior
await asyncio.sleep(random.uniform(1.5, 3.0))
# Submit the email
response = await session.client.post(
SUBMIT_FORM_URL,
json=payload,
headers=headers
)
# Log the response
logger.info(f"HTTP Request: POST {SUBMIT_FORM_URL} "{response.status_code} {response.reason_phrase}"")
# Save the complete raw response for debugging
try:
debug_path = f"debug_data/email_submission_full_response_{int(time.time())}.txt"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(f"Status: {response.status_code}
")
f.write(f"Headers: {dict(response.headers)}
")
f.write(f"Body: {response.text}")
logger.info(f"Saved full response to {debug_path}")
except Exception as e:
logger.error(f"Error saving full response: {str(e)}")
# Extract session ID directly from the response text using regex
if response.text and '"inAuthSessionID"' in response.text:
match = re.search(r'"inAuthSessionID"s*:s*"([^"]+)"', response.text)
if match:
extracted_session_id = match.group(1)
logger.info(f"Extracted auth session ID using regex: {extracted_session_id}")
# SET THE SESSION ID IN THE SESSION OBJECT
session.auth_session_id = extracted_session_id
# Also save it to a global variable for backup
global CURRENT_SESSION_ID
CURRENT_SESSION_ID = extracted_session_id
# Save to file for debugging
with open("debug_data/current_session_id.txt", "w") as f:
f.write(f"{extracted_session_id}")
# Save the JSON response for debugging
if response.status_code == 200:
try:
response_data = response.json()
with open("debug_data/email_submission_response.json", "w") as f:
json.dump(response_data, f, indent=2)
# Try to extract session ID from JSON structure as well
if 'formContainerAnswer' in response_data and 'inAuthSessionID' in response_data['formContainerAnswer']:
json_session_id = response_data['formContainerAnswer']['inAuthSessionID']
if json_session_id:
logger.info(f"Successfully extracted auth session ID from JSON: {json_session_id}")
session.auth_session_id = json_session_id # Set it again to be sure
else:
logger.warning("Failed to extract auth session ID from JSON response")
except Exception as e:
logger.error(f"Error parsing email submission response: {str(e)}")
logger.error(traceback.format_exc())
# Double-check that session ID is set
logger.info(f"Final session ID before continuing: {session.auth_session_id}")
# Save the response data in the session
session.save_last_response(response)
return response
except Exception as e:
logger.error(f"Error submitting email: {str(e)}")
logger.error(traceback.format_exc())
return None
async def handle_captcha(session, response, email):
"""Handle CAPTCHA challenges during signup"""
try:
site_key_match = re.search(r'sitekey:s*"([^"]+)"', response.text)
if site_key_match:
token = await solve_recaptcha_v2(site_key_match.group(1), str(response.url))
if token:
# Prepare headers with captcha token
headers = session.generate_headers()
headers["X-Csrf-Token"] = session.csrf_token or "x"
headers["Content-Type"] = CONTENT_TYPE_JSON
headers["Origin"] = f"https://{AUTH_HOST}"
headers["Referer"] = f"https://{AUTH_HOST}/v2/?breeze_init_req_id={generate_uuid()}&breeze_local_zone=dca18&next_url=https%3A%2F%2Fwww.ubereats.com%2Flogin-redirect%2F"
headers["Accept"] = ACCEPT_ALL
headers["X-Uber-Challenge-Provider"] = "ARKOSE_TOKEN"
headers["X-Uber-Challenge-Token"] = token
headers["X-Uber-Client-Name"] = "usl_desktop"
# Create updated payload with captcha token
payload = await create_email_submission_payload(session, email)
payload["formContainerAnswer"]["formAnswer"]["challengeResponse"] = token
# Submit with captcha
response = await session.client.post(
SUBMIT_FORM_URL,
json=payload,
headers=headers
)
# Save this response too
session.save_last_response(response)
return response
except Exception as e:
logger.error(f"Error handling captcha: {str(e)}")
return None
async def signup_flow(email, proxy=None):
"""Complete signup workflow with debug process"""
async with UberSession() as session:
if not await session.initialize(proxy):
return False
try:
# Submit email
response = await submit_email(session, email)
# Process the response and extract session ID
if response and response.status_code == 200:
try:
response_data = response.json()
# Extract and save the session ID
session.auth_session_id = response_data.get('formContainerAnswer', {}).get('inAuthSessionID')
if session.auth_session_id:
logger.info(f"Successfully extracted auth session ID: {session.auth_session_id[:10]}...")
else:
logger.warning("Failed to extract auth session ID from response")
# Save response data in the session object
if 'formContainerAnswer' in response_data:
session.form_data = response_data.get('formContainerAnswer')
logger.info("Successfully saved form data for verification")
# Check if we got an OTP verification request directly
if "form" in response_data and "screens" in response_data.get("form", {}):
screens = response_data["form"]["screens"]
for screen in screens:
if screen.get("screenType") == "EMAIL_OTP_CODE":
logger.info("Initial response requesting EMAIL_OTP_CODE verification")
# Extract the new session ID if available
new_session_id = response_data.get("inAuthSessionID")
if new_session_id:
logger.info(f"Got new session ID: {new_session_id}")
# Handle the OTP verification directly
return await handle_otp_verification(session, email, new_session_id)
# Standard flow - proceed with verification
logger.info(f"Successfully submitted email, waiting for verification code")
# Process verification
verification_success = await process_verification(session, email)
if verification_success:
logger.info("Email verification successful, proceeding to final registration")
# Add a delay before final step to mimic human behavior
await asyncio.sleep(random.uniform(2.0, 4.0))
# *** NEW APPROACH: Run debug process to try multiple variations ***
logger.info("Running comprehensive debug registration process")
debug_result = await debug_registration_process(email, session.auth_session_id)
if debug_result:
logger.info("Debug registration process worked!")
return True
# If all else fails, try the standard complete registration
logger.info("Debug approach failed, trying standard registration")
return await complete_registration(session, email)
else:
logger.error("Email verification failed, cannot complete registration")
return False
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Failed to parse response: {str(e)}")
logger.error(traceback.format_exc())
# Handle CAPTCHA challenges
if response and (response.status_code == 403 or "captcha" in response.text.lower() or "challenge" in response.text.lower()):
captcha_response = await handle_captcha(session, response, email)
if captcha_response and captcha_response.status_code == 200:
try:
response_data = captcha_response.json()
# Extract and save session ID
session.auth_session_id = response_data.get('formContainerAnswer', {}).get('inAuthSessionID')
if session.auth_session_id:
logger.info(f"Successfully extracted auth session ID (captcha): {session.auth_session_id[:10]}...")
else:
logger.warning("Failed to extract auth session ID from captcha response")
# Save response data
if 'formContainerAnswer' in response_data:
session.form_data = response_data.get('formContainerAnswer')
logger.info("Successfully saved form data from captcha response")
# Check if we got an OTP verification request directly
if "form" in response_data and "screens" in response_data.get("form", {}):
screens = response_data["form"]["screens"]
for screen in screens:
if screen.get("screenType") == "EMAIL_OTP_CODE":
logger.info("Captcha response requesting EMAIL_OTP_CODE verification")
# Extract the new session ID if available
new_session_id = response_data.get("inAuthSessionID")
if new_session_id:
logger.info(f"Got new session ID: {new_session_id}")
# Handle the OTP verification directly
return await handle_otp_verification(session, email, new_session_id)
# Standard flow - proceed with verification
logger.info(f"Successfully submitted email with captcha, waiting for verification code")
# Process verification
verification_success = await process_verification(session, email)
if verification_success:
logger.info("Email verification successful, proceeding to final registration")
# Add a delay before final step
await asyncio.sleep(random.uniform(2.0, 4.0))
# *** NEW APPROACH: Run debug process to try multiple variations ***
logger.info("Running comprehensive debug registration process")
debug_result = await debug_registration_process(email, session.auth_session_id)
if debug_result:
logger.info("Debug registration process worked!")
return True
# If all else fails, try the standard complete registration
logger.info("Debug approach failed, trying standard registration")
return await complete_registration(session, email)
else:
logger.error("Email verification failed, cannot complete registration")
return False
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Failed to parse captcha response: {str(e)}")
logger.error(traceback.format_exc())
except httpx.ProxyError as e:
logger.error(f"Proxy error: {str(e)}")
except Exception as e:
logger.error(f"Signup error: {str(e)}")
logger.error(traceback.format_exc())
return False
def load_proxies(file_path='proxies.txt'):
"""Load proxies from file with validation"""
if not os.path.exists(file_path):
logger.warning(f"Proxy file {file_path} not found")
return []
with open(file_path, 'r') as f:
proxies = [line.strip() for line in f if line.strip()]
logger.info(f"Loaded {len(proxies)} proxies")
return proxies
def get_random_proxy(proxy_list):
"""Get random proxy with format validation"""
if not proxy_list:
return None
# Shuffle the proxy list to avoid using the same ones repeatedly
shuffled_proxies = proxy_list.copy()
random.shuffle(shuffled_proxies)
for proxy in shuffled_proxies:
try:
# Handle host:port:user:pass format
if proxy.count(':') == 3:
host, port, user, pwd = proxy.split(':', 3)
formatted_proxy = f"http://{user}:{pwd}@{host}:{port}"
logger.info(f"Using authenticated proxy: {host}:{port}")
return formatted_proxy
# Handle host:port format
elif proxy.count(':') == 1:
host, port = proxy.split(':', 1)
formatted_proxy = f"http://{host}:{port}"
logger.info(f"Using proxy: {host}:{port}")
return formatted_proxy
if not proxy.startswith('http://') and not proxy.startswith('https://'):
formatted_proxy = f"http://{proxy}"
logger.info(f"Using proxy: {proxy}")
return formatted_proxy
logger.info(f"Using proxy: {proxy}")
return proxy
except Exception as e:
logger.error(f"Invalid proxy format: {proxy} - {str(e)}")
logger.warning("No valid proxies found, continuing without proxy")
return None
async def debug_registration_process(email, session_id, actual_payload=None):
"""
Debug the registration process by sending a series of diagnostic requests
with detailed logging and different approaches
"""
logger.info("Starting detailed registration debugging process")
if not os.path.exists("debug_data"):
os.makedirs("debug_data")
with open(f"debug_data/debug_session_{email.split('@')[0]}.txt", "w") as f:
f.write(f"Email: {email}
")
f.write(f"Session ID: {session_id}
")
f.write(f"Time: {datetime.datetime.now()}
")
try:
async with httpx.AsyncClient(timeout=30, follow_redirects=True, http2=True) as client:
logger.info("Step 1: Testing basic auth endpoint connectivity")
basic_response = await client.get(
f"https://{AUTH_HOST}/v2/",
headers={
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
)
logger.info(f"Basic auth response: {basic_response.status_code}")
cookies = {}
if "set-cookie" in basic_response.headers:
logger.info("Found cookies in basic auth response")
try:
cookie_header = basic_response.headers["set-cookie"]
for part in cookie_header.split(", "):
if "=" in part:
name = part.split("=")[0]
value = part.split("=")[1].split(";")[0]
cookies[name] = value
except Exception as e:
logger.error(f"Error parsing cookies: {str(e)}")
with open(f"debug_data/step1_basic_auth_{email.split('@')[0]}.txt", "w") as f:
f.write(f"Status: {basic_response.status_code}
")
f.write(f"Headers: {dict(basic_response.headers)}
")
f.write(f"Body: {basic_response.text[:5000]}")
logger.info("Step 2: Getting CSRF token")
csrf_token = None
if basic_response.status_code == 200:
# Try to extract CSRF token from the HTML
try:
soup = BeautifulSoup(basic_response.text, "html.parser")
meta_token = soup.find("meta", {"name": "uber-csrf-token"})
if meta_token:
csrf_token = meta_token.get("content")
logger.info(f"Found CSRF token: {csrf_token[:10]}...")
except Exception as e:
logger.error(f"Error extracting CSRF token: {str(e)}")
logger.info("Step 3: Testing multiple registration request variations")
base_payload = {
"formContainerAnswer": {
"inAuthSessionID": session_id,
"formAnswer": {
"flowType": "SIGN_UP",
"standardFlow": True,
"accountManagementFlow": False,
"daffFlow": False,
"productConstraints": {
"isEligibleForWebOTPAutofill": False,
"uslFELibVersion": "",
"uslMobileLibVersion": "",
"isWhatsAppAvailable": False,
"isPublicKeyCredentialSupported": True,
"isFacebookAvailable": False,
"isRakutenAvailable": False,
"isKakaoAvailable": False
},
"additionalParams": {
"isEmailUpdatePostAuth": False
},
"deviceData": "IsOFwpJePcOFw5sVO8KFwoIBb8OWw5QPbMKEw5UFbcOXw5YFe",
"nextURL": "https://www.ubereats.com/login-redirect/?app_clip=false&campaign=signin_universal_link&guest_mode=false",
"sessionReferer": "https://www.ubereats.com/",
"uslURL": "https://auth.uber.com/v2/",
"screenAnswers": [
{
"screenType": "LEGAL",
"eventType": "TypeSignupLegal",
"fieldAnswers": [
{
"fieldType": "LEGAL_CONFIRMATION",
"legalConfirmation": True
},
{
"fieldType": "LEGAL_CONFIRMATIONS",
"legalConfirmations": {
"legalConfirmations": [
{
"disclosureVersionUUID": "ef1d61c9-b09e-4d44-8cfb-ddfa15cc7523",
"isAccepted": True
}
]
}
}
]
}
]
}
}
}
payload = actual_payload or base_payload
# Set up cookie string
cookie_str = ""
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
# Try request to Auth endpoint with different header combinations
variations = [
{
"name": "basic",
"url": SUBMIT_FORM_URL,
"headers": {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Referer": f"https://{AUTH_HOST}/v2/"
}
},
{
"name": "with_csrf",
"url": SUBMIT_FORM_URL,
"headers": {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Referer": f"https://{AUTH_HOST}/v2/",
"X-Csrf-Token": csrf_token or "x"
}
},
{
"name": "with_cookies",
"url": SUBMIT_FORM_URL,
"headers": {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Referer": f"https://{AUTH_HOST}/v2/",
"Cookie": cookie_str
}
},
{
"name": "full_auth",
"url": SUBMIT_FORM_URL,
"headers": {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": FINGERPRINT_CONFIG["userAgent"],
"Referer": f"https://{AUTH_HOST}/v2/",
"X-Csrf-Token": csrf_token or "x",
"Cookie": cookie_str,
"Origin": f"https://{AUTH_HOST}"
}
},
{
"name": "exact_headers",
"url": SUBMIT_OTP_URL,
"headers": {
"authority": "cn-geo1.uber.com",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "en-US,en;q=0.9",
"content-type": "application/json",
"origin": "https://auth.uber.com",
"referer": "https://auth.uber.com/",
"sec-ch-ua": ""Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": ""Windows"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"x-uber-analytics-session-id": "2dac1934-977e-488b-9299-36a148446c1e",
"x-uber-client-name": "usl_desktop",
"x-uber-did": "11ea941e-39dc-45ea-913b-9f59404332b7",
"x-uber-marketing-id": "a131ed4d-ea48-421e-8557-fc3ed9c03044",
"x-uber-request-uuid": "59ae2b19-9f32-4beb-a769-99498a3ebb09",
"x-uber-usl-id": "11ea941e-39dc-45ea-913b-9f59404332b7"
}
}
]
for variation in variations:
logger.info(f"Testing variation: {variation['name']}")
try:
response = await client.post(
variation["url"],
json=payload,
headers=variation["headers"]
)
logger.info(f"Variation {variation['name']} response: {response.status_code}")
with open(f"debug_data/step3_{variation['name']}_{email.split('@')[0]}.txt", "w") as f:
f.write(f"Status: {response.status_code}
")
f.write(f"Headers: {dict(response.headers)}
")
f.write(f"Body: {response.text}")
# Check for success
if response.status_code == 200:
logger.info(f"✅ Success with variation: {variation['name']}")
# Log the successful approach
with open("accounts/successful_emails.txt", "a") as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {email} (DEBUG SUCCESS - {variation['name']})
")
# Save account details
with open(f"accounts/{email.split('@')[0]}_complete.txt", "w") as f:
f.write(f"Email: {email}
")
f.write(f"Date Created: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
")
f.write(f"Session ID: {session_id}
")
f.write(f"Completion Method: Debug variation - {variation['name']}
")
return True
except Exception as e:
logger.error(f"Error testing variation {variation['name']}: {str(e)}")
logger.error("All registration variations failed")
return False
except Exception as e:
logger.error(f"Error in debug registration process: {str(e)}")
logger.error(traceback.format_exc())
return False
async def main():
"""Main execution flow with advanced rotation"""
proxies = load_proxies()
# Create output directories
os.makedirs("debug_data", exist_ok=True)
os.makedirs("accounts", exist_ok=True)
# Set up counters for statistics
stats = {
"attempts": 0,
"successes": 0,
"captchas_encountered": 0,
"proxy_errors": 0
}
for attempt in range(10):
stats["attempts"] += 1
proxy = get_random_proxy(proxies)
email = generate_random_email()
logger.info(f"Attempt {attempt+1}/10 with {email}")
logger.info(f"Stats: {stats}")
try:
if attempt > 0:
delay = random.randint(20, 60)
logger.info(f"Waiting {delay} seconds before next attempt...")
await asyncio.sleep(delay)
if await signup_flow(email, proxy):
stats["successes"] += 1
logger.info(f"Account created successfully with email: {email}!")
with open("accounts/successful_emails.txt", "a") as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {email}
")
with open(f"accounts/{email.split('@')[0]}.txt", "w") as f:
f.write(f"Email: {email}
")
f.write(f"Date Created: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
")
f.write(f"Proxy Used: {proxy if proxy else 'None'}
")
f.write(f"User-Agent: {FINGERPRINT_CONFIG['userAgent']}
")
logger.info(f"Final Stats: Attempts: {stats['attempts']}, Successes: {stats['successes']}")
return
except httpx.ProxyError:
stats["proxy_errors"] += 1
logger.error(f"Proxy error with {proxy}")
continue
except Exception as e:
logger.error(f"Attempt failed: {str(e)}")
traceback_str = traceback.format_exc()
logger.debug(f"Traceback: {traceback_str}")
with open(f"debug_data/failure_{attempt+1}.log", "w") as f:
f.write(f"Attempt: {attempt+1}
")
f.write(f"Email: {email}
")
f.write(f"Proxy: {proxy}
")
f.write(f"Error: {str(e)}
")
f.write(traceback_str)
await asyncio.sleep(attempt * 15)
logger.error(f"Failed after {stats['attempts']} attempts")
logger.info(f"Final Stats: {stats}")
async def run_with_error_handling():
os.makedirs("debug_data", exist_ok=True)
os.makedirs("accounts", exist_ok=True)
os.makedirs("logs", exist_ok=True)
start_time = time.time()
try:
await main()
except KeyboardInterrupt:
logger.info("👋 Program interrupted by user. Exiting...")
except Exception as e:
logger.critical(f"💥 Critical error: {str(e)}")
logger.critical(traceback.format_exc())
with open("logs/crash_report.txt", "a") as f:
f.write(f"
--- CRASH REPORT: {datetime.datetime.now()} ---
")
f.write(f"Error: {str(e)}
")
f.write(traceback.format_exc())
f.write("
--- END CRASH REPORT ---
")
finally:
runtime = time.time() - start_time
logger.info(f"⏱️ Total runtime: {int(runtime//60)} minutes and {int(runtime%60)} seconds")
if __name__ == "__main__":
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.2164.39"
]
FINGERPRINT_CONFIG["userAgent"] = random.choice(user_agents)
run_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"logs/run_{run_id}.log", "w") as f:
f.write(f"UberEats Signup Automation Run: {run_id}
")
f.write(f"User-Agent: {FINGERPRINT_CONFIG['userAgent']}
")
f.write("-" * 50 + "
")
asyncio.run(run_with_error_handling())