🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双
💻 个人主页——>个人主页欢迎访问
😸 Github主页——>Github主页欢迎访问
❓ 知乎主页——>知乎主页欢迎访问
🏳️🌈 CSDN博客主页:请点击——> 一晌小贪欢的博客主页求关注
👍 该系列文章专栏:请点击——>Python办公自动化专栏求订阅
🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏求订阅
📕 此外还有python基础专栏:请点击——>Python基础学习专栏求订阅
文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
❤️ 欢迎各位佬关注! ❤️
课程目标
- 理解代理的工作原理和类型
- 掌握代理池的构建和管理
- 学会IP轮换和反反爬虫技术
- 了解用户代理伪装和请求头优化
1. 代理基础知识
1.1 什么是代理?
代理服务器是客户端和目标服务器之间的中介,可以隐藏客户端的真实IP地址。
import requests
# 不使用代理的请求
response = requests.get('https://httpbin.org/ip')
print("真实IP:", response.json())
# 使用代理的请求
proxies = {
'http': 'http://proxy-server:port',
'https': 'https://proxy-server:port'
}
response = requests.get('https://httpbin.org/ip', proxies=proxies)
print("代理IP:", response.json())
1.2 代理类型
# HTTP代理
http_proxy = {
'http': 'http://username:password@proxy-server:port',
'https': 'http://username:password@proxy-server:port'
}
# HTTPS代理
https_proxy = {
'http': 'https://username:password@proxy-server:port',
'https': 'https://username:password@proxy-server:port'
}
# SOCKS代理
socks_proxy = {
'http': 'socks5://username:password@proxy-server:port',
'https': 'socks5://username:password@proxy-server:port'
}
1.3 代理验证
import requests
import time
from urllib.parse import urlparse
class ProxyValidator:
def __init__(self, timeout=10):
self.timeout = timeout
self.test_urls = [
'https://httpbin.org/ip',
'https://httpbin.org/get',
'https://www.google.com'
]
def validate_proxy(self, proxy_url):
"""验证代理是否可用"""
proxies = {
'http': proxy_url,
'https': proxy_url
}
try:
start_time = time.time()
response = requests.get(
self.test_urls[0],
proxies=proxies,
timeout=self.timeout
)
end_time = time.time()
if response.status_code == 200:
response_data = response.json()
return {
'proxy': proxy_url,
'status': 'valid',
'ip': response_data.get('origin', ''),
'response_time': end_time - start_time,
'anonymity': self.check_anonymity(response_data)
}
else:
return {
'proxy': proxy_url,
'status': 'invalid',
'error': f'HTTP {response.status_code}'
}
except Exception as e:
return {
'proxy': proxy_url,
'status': 'invalid',
'error': str(e)
}
def check_anonymity(self, response_data):
"""检查代理匿名性"""
headers = response_data.get('headers', {})
# 检查是否暴露真实IP
forwarded_headers = [
'X-Forwarded-For',
'X-Real-Ip',
'X-Originating-Ip',
'Client-Ip'
]
for header in forwarded_headers:
if header in headers:
return 'transparent' # 透明代理
# 检查是否暴露代理信息
proxy_headers = [
'Via',
'X-Proxy-Id',
'Proxy-Connection'
]
for header in proxy_headers:
if header in headers:
return 'anonymous' # 匿名代理
return 'elite' # 高匿代理
# 使用示例
validator = ProxyValidator()
result = validator.validate_proxy('http://proxy-server:port')
print(result)
2. 代理池构建
2.1 基础代理池
import requests
import random
import time
import threading
from queue import Queue, Empty
import json
class ProxyPool:
def __init__(self, max_size=100):
self.max_size = max_size
self.proxies = Queue(maxsize=max_size)
self.bad_proxies = set()
self.lock = threading.Lock()
# 统计信息
self.stats = {
'total_proxies': 0,
'valid_proxies': 0,
'failed_proxies': 0,
'requests_made': 0
}
def add_proxy(self, proxy_url):
"""添加代理到池中"""
if proxy_url not in self.bad_proxies:
try:
self.proxies.put_nowait(proxy_url)
with self.lock:
self.stats['total_proxies'] += 1
return True
except:
return False
return False
def get_proxy(self):
"""从池中获取代理"""
try:
proxy = self.proxies.get_nowait()
return proxy
except Empty:
return None
def return_proxy(self, proxy_url, success=True):
"""归还代理到池中"""
if success:
self.add_proxy(proxy_url)
with self.lock:
self.stats['valid_proxies'] += 1
else:
self.bad_proxies.add(proxy_url)
with self.lock:
self.stats['failed_proxies'] += 1
def make_request(self, url, **kwargs):
"""使用代理池发起请求"""
max_retries = 3
for attempt in range(max_retries):
proxy = self.get_proxy()
if not proxy:
raise Exception("代理池为空")
proxies = {
'http': proxy,
'https': proxy
}
try:
response = requests.get(url, proxies=proxies, **kwargs)
self.return_proxy(proxy, success=True)
with self.lock:
self.stats['requests_made'] += 1
return response
except Exception as e:
self.return_proxy(proxy, success=False)
if attempt == max_retries - 1:
raise e
continue
def get_stats(self):
"""获取统计信息"""
with self.lock:
return self.stats.copy()
def size(self):
"""获取代理池大小"""
return self.proxies.qsize()
# 使用示例
pool = ProxyPool()
# 添加代理
proxy_list = [
'http://proxy1:port',
'http://proxy2:port',
'http://proxy3:port'
]
for proxy in proxy_list:
pool.add_proxy(proxy)
# 使用代理池发起请求
try:
response = pool.make_request('https://httpbin.org/ip', timeout=10)
print(response.json())
except Exception as e:
print(f"请求失败:{e}")
print("代理池统计:", pool.get_stats())
2.2 高级代理池
import asyncio
import aiohttp
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging
@dataclass
class ProxyInfo:
url: str
ip: str = ""
port: int = 0
protocol: str = "http"
anonymity: str = "unknown"
response_time: float = 0.0
success_count: int = 0
fail_count: int = 0
last_used: float = 0.0
last_checked: float = 0.0
@property
def success_rate(self):
total = self.success_count + self.fail_count
return self.success_count / total if total > 0 else 0.0
@property
def score(self):
"""代理评分(响应时间越短、成功率越高,分数越高)"""
if self.response_time == 0:
return 0
time_score = 1.0 / (self.response_time + 0.1)
rate_score = self.success_rate
return time_score * rate_score
class AdvancedProxyPool:
def __init__(self, max_size=200, check_interval=300):
self.max_size = max_size
self.check_interval = check_interval
self.proxies = {} # url -> ProxyInfo
self.lock = asyncio.Lock()
# 配置日志
self.logger = logging.getLogger(__name__)
# 启动后台检查任务
self.check_task = None
async def add_proxy(self, proxy_url):
"""添加代理"""
async with self.lock:
if proxy_url not in self.proxies:
proxy_info = ProxyInfo(url=proxy_url)
# 解析代理信息
if '://' in proxy_url:
protocol, address = proxy_url.split('://', 1)
proxy_info.protocol = protocol
if '@' in address:
auth, address = address.split('@', 1)
if ':' in address:
ip, port = address.split(':', 1)
proxy_info.ip = ip
proxy_info.port = int(port)
self.proxies[proxy_url] = proxy_info
self.logger.info(f"添加代理:{proxy_url}")
return True
return False
async def validate_proxy(self, proxy_info: ProxyInfo):
"""验证代理"""
test_url = 'https://httpbin.org/ip'
try:
connector = aiohttp.TCPConnector(
limit=10,
ttl_dns_cache=300,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
start_time = time.time()
async with session.get(
test_url,
proxy=proxy_info.url
) as response:
end_time = time.time()
proxy_info.response_time = end_time - start_time
if response.status == 200:
data = await response.json()
proxy_info.success_count += 1
proxy_info.last_checked = time.time()
# 检查匿名性
headers = data.get('headers', {})
proxy_info.anonymity = self.check_anonymity(headers)
return True
else:
proxy_info.fail_count += 1
return False
except Exception as e:
proxy_info.fail_count += 1
self.logger.debug(f"代理验证失败:{proxy_info.url} - {e}")
return False
def check_anonymity(self, headers):
"""检查代理匿名性"""
forwarded_headers = [
'X-Forwarded-For', 'X-Real-Ip',
'X-Originating-Ip', 'Client-Ip'
]
proxy_headers = [
'Via', 'X-Proxy-Id', 'Proxy-Connection'
]
for header in forwarded_headers:
if header in headers:
return 'transparent'
for header in proxy_headers:
if header in headers:
return 'anonymous'
return 'elite'
async def get_best_proxy(self, min_score=0.1):
"""获取最佳代理"""
async with self.lock:
valid_proxies = [
proxy for proxy in self.proxies.values()
if proxy.success_rate > 0.5 and proxy.score >= min_score
]
if not valid_proxies:
return None
# 按评分排序,选择最佳代理
valid_proxies.sort(key=lambda x: x.score, reverse=True)
# 随机选择前10%的代理,避免总是使用同一个
top_count = max(1, len(valid_proxies) // 10)
selected = random.choice(valid_proxies[:top_count])
selected.last_used = time.time()
return selected
async def make_request(self, url, **kwargs):
"""使用代理发起请求"""
max_retries = 3
for attempt in range(max_retries):
proxy_info = await self.get_best_proxy()
if not proxy_info:
raise Exception("没有可用的代理")
try:
connector = aiohttp.TCPConnector(
limit=50,
ttl_dns_cache=300,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
async with session.get(
url,
proxy=proxy_info.url,
**kwargs
) as response:
proxy_info.success_count += 1
return response
except Exception as e:
proxy_info.fail_count += 1
self.logger.warning(f"请求失败:{proxy_info.url} - {e}")
if attempt == max_retries - 1:
raise e
continue
async def check_all_proxies(self):
"""检查所有代理"""
self.logger.info("开始检查所有代理...")
async with self.lock:
proxy_list = list(self.proxies.values())
# 并发检查代理
semaphore = asyncio.Semaphore(20)
async def check_proxy(proxy_info):
async with semaphore:
return await self.validate_proxy(proxy_info)
tasks = [check_proxy(proxy) for proxy in proxy_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 清理失效代理
async with self.lock:
to_remove = []
for proxy_info in proxy_list:
if proxy_info.success_rate < 0.1 and proxy_info.fail_count > 5:
to_remove.append(proxy_info.url)
for url in to_remove:
del self.proxies[url]
self.logger.info(f"移除失效代理:{url}")
valid_count = sum(1 for r in results if r is True)
self.logger.info(f"代理检查完成,有效代理:{valid_count}/{len(proxy_list)}")
async def start_background_check(self):
"""启动后台检查任务"""
async def check_loop():
while True:
try:
await self.check_all_proxies()
await asyncio.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"后台检查出错:{e}")
await asyncio.sleep(60)
self.check_task = asyncio.create_task(check_loop())
async def stop_background_check(self):
"""停止后台检查任务"""
if self.check_task:
self.check_task.cancel()
try:
await self.check_task
except asyncio.CancelledError:
pass
async def get_stats(self):
"""获取统计信息"""
async with self.lock:
total = len(self.proxies)
valid = sum(1 for p in self.proxies.values() if p.success_rate > 0.5)
avg_response_time = 0
if total > 0:
total_time = sum(p.response_time for p in self.proxies.values())
avg_response_time = total_time / total
return {
'total_proxies': total,
'valid_proxies': valid,
'avg_response_time': avg_response_time,
'anonymity_distribution': self.get_anonymity_stats()
}
def get_anonymity_stats(self):
"""获取匿名性统计"""
stats = {'elite': 0, 'anonymous': 0, 'transparent': 0, 'unknown': 0}
for proxy in self.proxies.values():
stats[proxy.anonymity] += 1
return stats
# 使用示例
async def main():
# 配置日志
logging.basicConfig(level=logging.INFO)
pool = AdvancedProxyPool()
# 添加代理
proxy_list = [
'http://proxy1:port',
'http://proxy2:port',
'http://proxy3:port'
]
for proxy in proxy_list:
await pool.add_proxy(proxy)
# 启动后台检查
await pool.start_background_check()
try:
# 使用代理池
response = await pool.make_request('https://httpbin.org/ip')
print(await response.json())
# 获取统计信息
stats = await pool.get_stats()
print("代理池统计:", stats)
finally:
await pool.stop_background_check()
# 运行示例
# asyncio.run(main())
3. 免费代理获取
3.1 代理网站爬取
import requests
from bs4 import BeautifulSoup
import re
import time
import random
class FreeProxyCollector:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.proxy_sources = [
{
'name': 'free-proxy-list',
'url': 'https://free-proxy-list.net/',
'parser': self.parse_free_proxy_list
},
{
'name': 'proxy-list',
'url': 'https://www.proxy-list.download/api/v1/get?type=http',
'parser': self.parse_proxy_list_download
}
]
def parse_free_proxy_list(self, html):
"""解析 free-proxy-list.net"""
soup = BeautifulSoup(html, 'html.parser')
proxies = []
table = soup.find('table', {'id': 'proxylisttable'})
if table:
rows = table.find_all('tr')[1:] # 跳过表头
for row in rows:
cols = row.find_all('td')
if len(cols) >= 7:
ip = cols[0].text.strip()
port = cols[1].text.strip()
country = cols[2].text.strip()
anonymity = cols[4].text.strip()
https = cols[6].text.strip()
protocol = 'https' if https == 'yes' else 'http'
proxy_url = f"{protocol}://{ip}:{port}"
proxies.append({
'url': proxy_url,
'ip': ip,
'port': int(port),
'country': country,
'anonymity': anonymity.lower(),
'protocol': protocol
})
return proxies
def parse_proxy_list_download(self, content):
"""解析 proxy-list.download API"""
proxies = []
lines = content.strip().split('\n')
for line in lines:
if ':' in line:
ip, port = line.strip().split(':', 1)
proxy_url = f"http://{ip}:{port}"
proxies.append({
'url': proxy_url,
'ip': ip,
'port': int(port),
'country': 'unknown',
'anonymity': 'unknown',
'protocol': 'http'
})
return proxies
def collect_from_source(self, source):
"""从单个源收集代理"""
try:
print(f"正在收集代理:{source['name']}")
response = self.session.get(source['url'], timeout=15)
response.raise_for_status()
if response.headers.get('content-type', '').startswith('application/json'):
proxies = source['parser'](response.text)
else:
proxies = source['parser'](response.text)
print(f"从 {source['name']} 收集到 {len(proxies)} 个代理")
return proxies
except Exception as e:
print(f"收集代理失败:{source['name']} - {e}")
return []
def collect_all_proxies(self):
"""收集所有源的代理"""
all_proxies = []
for source in self.proxy_sources:
proxies = self.collect_from_source(source)
all_proxies.extend(proxies)
# 随机延迟,避免被封
time.sleep(random.uniform(1, 3))
# 去重
unique_proxies = {}
for proxy in all_proxies:
unique_proxies[proxy['url']] = proxy
result = list(unique_proxies.values())
print(f"总共收集到 {len(result)} 个唯一代理")
return result
def validate_proxies(self, proxies, max_workers=20):
"""验证代理列表"""
from concurrent.futures import ThreadPoolExecutor, as_completed
validator = ProxyValidator(timeout=10)
valid_proxies = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_proxy = {
executor.submit(validator.validate_proxy, proxy['url']): proxy
for proxy in proxies
}
for future in as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
result = future.result()
if result['status'] == 'valid':
valid_proxies.append(result)
print(f"✓ 有效代理:{result['proxy']} - {result['response_time']:.2f}s")
else:
print(f"✗ 无效代理:{result['proxy']}")
except Exception as e:
print(f"✗ 验证出错:{proxy['url']} - {e}")
return valid_proxies
# 使用示例
collector = FreeProxyCollector()
# 收集代理
proxies = collector.collect_all_proxies()
# 验证代理
valid_proxies = collector.validate_proxies(proxies[:50]) # 只验证前50个
print(f"验证完成,有效代理:{len(valid_proxies)} 个")
# 保存有效代理
import json
with open('valid_proxies.json', 'w') as f:
json.dump(valid_proxies, f, indent=2)
3.2 代理API集成
import requests
import json
import time
class ProxyAPI:
def __init__(self):
self.apis = {
'proxylist': {
'url': 'https://www.proxy-list.download/api/v1/get',
'params': {'type': 'http'},
'parser': self.parse_text_list
},
'gimmeproxy': {
'url': 'https://gimmeproxy.com/api/getProxy',
'params': {'format': 'json', 'protocol': 'http'},
'parser': self.parse_gimmeproxy
}
}
def parse_text_list(self, content):
"""解析文本格式的代理列表"""
proxies = []
lines = content.strip().split('\n')
for line in lines:
if ':' in line:
ip, port = line.strip().split(':', 1)
proxies.append(f"http://{ip}:{port}")
return proxies
def parse_gimmeproxy(self, content):
"""解析 gimmeproxy API 响应"""
try:
data = json.loads(content)
proxy_url = f"{data['protocol']}://{data['ip']}:{data['port']}"
return [proxy_url]
except:
return []
def get_proxies_from_api(self, api_name, count=10):
"""从指定API获取代理"""
if api_name not in self.apis:
raise ValueError(f"未知的API:{api_name}")
api_config = self.apis[api_name]
proxies = []
for i in range(count):
try:
response = requests.get(
api_config['url'],
params=api_config['params'],
timeout=10
)
if response.status_code == 200:
new_proxies = api_config['parser'](response.text)
proxies.extend(new_proxies)
if api_name == 'gimmeproxy':
time.sleep(1) # gimmeproxy 有速率限制
except Exception as e:
print(f"API请求失败:{api_name} - {e}")
break
return list(set(proxies)) # 去重
def get_all_proxies(self, count_per_api=20):
"""从所有API获取代理"""
all_proxies = []
for api_name in self.apis:
print(f"正在从 {api_name} 获取代理...")
proxies = self.get_proxies_from_api(api_name, count_per_api)
all_proxies.extend(proxies)
print(f"从 {api_name} 获取到 {len(proxies)} 个代理")
return list(set(all_proxies))
# 使用示例
api = ProxyAPI()
proxies = api.get_all_proxies(count_per_api=10)
print(f"总共获取到 {len(proxies)} 个代理")
4. 反反爬虫技术
4.1 用户代理轮换
import random
import requests
import time
class UserAgentRotator:
def __init__(self):
self.user_agents = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Safari/605.1.15',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36 Edg/90.0.818.66'
]
self.mobile_agents = [
# Mobile Chrome
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0',
'Mozilla/5.0 (Linux; Android 11; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36'
]
def get_random_desktop_ua(self):
"""获取随机桌面用户代理"""
return random.choice(self.user_agents)
def get_random_mobile_ua(self):
"""获取随机移动用户代理"""
return random.choice(self.mobile_agents)
def get_random_ua(self, mobile_ratio=0.3):
"""获取随机用户代理(可指定移动端比例)"""
if random.random() < mobile_ratio:
return self.get_random_mobile_ua()
else:
return self.get_random_desktop_ua()
class AntiDetectionSpider:
def __init__(self, proxy_pool=None):
self.ua_rotator = UserAgentRotator()
self.proxy_pool = proxy_pool
self.session = requests.Session()
# 请求间隔配置
self.min_delay = 1
self.max_delay = 3
self.last_request_time = 0
def get_headers(self, url=None):
"""生成随机请求头"""
headers = {
'User-Agent': self.ua_rotator.get_random_ua(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': random.choice([
'en-US,en;q=0.9',
'zh-CN,zh;q=0.9,en;q=0.8',
'ja-JP,ja;q=0.9,en;q=0.8'
]),
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
# 随机添加一些可选头
optional_headers = {
'Cache-Control': random.choice(['no-cache', 'max-age=0']),
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1'
}
# 随机添加部分可选头
for key, value in optional_headers.items():
if random.random() > 0.5:
headers[key] = value
# 如果有referer,添加referer头
if url and random.random() > 0.3:
headers['Referer'] = url
return headers
def wait_between_requests(self):
"""请求间隔控制"""
current_time = time.time()
elapsed = current_time - self.last_request_time
min_interval = random.uniform(self.min_delay, self.max_delay)
if elapsed < min_interval:
sleep_time = min_interval - elapsed
time.sleep(sleep_time)
self.last_request_time = time.time()
def make_request(self, url, **kwargs):
"""发起请求"""
self.wait_between_requests()
# 设置请求头
headers = self.get_headers(url)
kwargs.setdefault('headers', {}).update(headers)
# 设置代理
if self.proxy_pool:
proxy = self.proxy_pool.get_proxy()
if proxy:
kwargs['proxies'] = {
'http': proxy,
'https': proxy
}
# 设置超时
kwargs.setdefault('timeout', 15)
try:
response = self.session.get(url, **kwargs)
# 检查是否被反爬虫检测
if self.is_blocked(response):
raise Exception("请求被阻止,可能触发了反爬虫机制")
return response
except Exception as e:
# 如果使用了代理且失败,标记代理为失效
if self.proxy_pool and 'proxies' in kwargs:
self.proxy_pool.return_proxy(
kwargs['proxies']['http'],
success=False
)
raise e
def is_blocked(self, response):
"""检测是否被反爬虫阻止"""
# 检查状态码
if response.status_code in [403, 429, 503]:
return True
# 检查响应内容
content = response.text.lower()
blocked_keywords = [
'access denied',
'blocked',
'captcha',
'verification',
'robot',
'bot detection',
'访问被拒绝',
'验证码',
'机器人检测'
]
for keyword in blocked_keywords:
if keyword in content:
return True
# 检查响应大小(异常小的响应可能是阻止页面)
if len(content) < 100:
return True
return False
# 使用示例
spider = AntiDetectionSpider()
try:
response = spider.make_request('https://httpbin.org/headers')
print("请求成功")
print("响应头:", response.json())
except Exception as e:
print(f"请求失败:{e}")
4.2 请求头优化
import random
import time
from urllib.parse import urlparse
class HeaderOptimizer:
def __init__(self):
self.browser_versions = {
'chrome': ['91.0.4472.124', '90.0.4430.212', '89.0.4389.128'],
'firefox': ['89.0', '88.0', '87.0'],
'safari': ['14.1.1', '14.0.3', '13.1.2'],
'edge': ['91.0.864.59', '90.0.818.66', '89.0.774.68']
}
self.os_versions = {
'windows': ['Windows NT 10.0; Win64; x64', 'Windows NT 6.1; Win64; x64'],
'macos': ['Macintosh; Intel Mac OS X 10_15_7', 'Macintosh; Intel Mac OS X 10_14_6'],
'linux': ['X11; Linux x86_64', 'X11; Ubuntu; Linux x86_64']
}
def generate_realistic_headers(self, target_url=None):
"""生成真实的浏览器请求头"""
# 选择浏览器和操作系统
browser = random.choice(['chrome', 'firefox', 'safari', 'edge'])
os_type = random.choice(['windows', 'macos', 'linux'])
# 生成User-Agent
user_agent = self.generate_user_agent(browser, os_type)
# 基础请求头
headers = {
'User-Agent': user_agent,
'Accept': self.get_accept_header(browser),
'Accept-Language': self.get_accept_language(),
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1'
}
# 根据浏览器添加特定头
if browser == 'chrome':
headers.update({
'sec-ch-ua': f'"Chromium";v="{self.browser_versions[browser][0].split(".")[0]}", "Google Chrome";v="{self.browser_versions[browser][0].split(".")[0]}", ";Not A Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': f'"{os_type.title()}"'
})
# 添加缓存控制
if random.random() > 0.5:
headers['Cache-Control'] = random.choice(['no-cache', 'max-age=0'])
headers['Pragma'] = 'no-cache'
# 添加DNT头
if random.random() > 0.3:
headers['DNT'] = '1'
# 如果有目标URL,添加相关头
if target_url:
parsed = urlparse(target_url)
# 添加Host头
headers['Host'] = parsed.netloc
# 随机添加Referer
if random.random() > 0.4:
referers = [
'https://www.google.com/',
'https://www.bing.com/',
'https://www.baidu.com/',
f'https://{parsed.netloc}/'
]
headers['Referer'] = random.choice(referers)
return headers
def generate_user_agent(self, browser, os_type):
"""生成用户代理字符串"""
browser_version = random.choice(self.browser_versions[browser])
os_version = random.choice(self.os_versions[os_type])
if browser == 'chrome':
webkit_version = '537.36'
return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{browser_version} Safari/{webkit_version}'
elif browser == 'firefox':
return f'Mozilla/5.0 ({os_version}; rv:{browser_version}) Gecko/20100101 Firefox/{browser_version}'
elif browser == 'safari':
webkit_version = '605.1.15'
return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Version/{browser_version} Safari/{webkit_version}'
elif browser == 'edge':
webkit_version = '537.36'
chrome_version = '91.0.4472.124' # Edge基于Chromium
return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{webkit_version} Edg/{browser_version}'
def get_accept_header(self, browser):
"""获取Accept头"""
accept_headers = {
'chrome': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'firefox': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'safari': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'edge': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
}
return accept_headers.get(browser, accept_headers['chrome'])
def get_accept_language(self):
"""获取Accept-Language头"""
languages = [
'en-US,en;q=0.9',
'zh-CN,zh;q=0.9,en;q=0.8',
'ja-JP,ja;q=0.9,en;q=0.8',
'ko-KR,ko;q=0.9,en;q=0.8',
'de-DE,de;q=0.9,en;q=0.8',
'fr-FR,fr;q=0.9,en;q=0.8'
]
return random.choice(languages)
# 使用示例
optimizer = HeaderOptimizer()
# 生成针对特定URL的请求头
headers = optimizer.generate_realistic_headers('https://example.com')
print("生成的请求头:")
for key, value in headers.items():
print(f"{key}: {value}")
# 使用生成的请求头发起请求
import requests
response = requests.get('https://httpbin.org/headers', headers=headers)
print("\n服务器收到的请求头:")
print(response.json())
4.3 会话管理和Cookie处理
import requests
import json
import time
import random
from http.cookiejar import LWPCookieJar
class SessionManager:
def __init__(self, cookie_file=None):
self.session = requests.Session()
self.cookie_file = cookie_file
# 设置Cookie jar
if cookie_file:
self.session.cookies = LWPCookieJar(cookie_file)
try:
self.session.cookies.load(ignore_discard=True)
except FileNotFoundError:
pass
# 会话配置
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 请求历史
self.request_history = []
def save_cookies(self):
"""保存Cookie到文件"""
if self.cookie_file:
self.session.cookies.save(ignore_discard=True)
def load_cookies(self):
"""从文件加载Cookie"""
if self.cookie_file:
try:
self.session.cookies.load(ignore_discard=True)
return True
except FileNotFoundError:
return False
return False
def clear_cookies(self):
"""清除所有Cookie"""
self.session.cookies.clear()
def get_cookies_dict(self):
"""获取Cookie字典"""
return dict(self.session.cookies)
def set_cookies(self, cookies_dict):
"""设置Cookie"""
for name, value in cookies_dict.items():
self.session.cookies.set(name, value)
def simulate_browser_behavior(self, url):
"""模拟真实浏览器行为"""
# 1. 首先访问主页
domain = '/'.join(url.split('/')[:3])
try:
# 访问主页
response = self.session.get(domain, timeout=10)
time.sleep(random.uniform(1, 3))
# 模拟查看页面
if random.random() > 0.5:
# 随机访问一些常见页面
common_pages = ['/about', '/contact', '/help', '/sitemap']
page = random.choice(common_pages)
try:
self.session.get(f"{domain}{page}", timeout=10)
time.sleep(random.uniform(0.5, 2))
except:
pass
# 最后访问目标页面
response = self.session.get(url, timeout=10)
# 记录请求历史
self.request_history.append({
'url': url,
'timestamp': time.time(),
'status_code': response.status_code
})
return response
except Exception as e:
print(f"模拟浏览器行为失败:{e}")
return None
def handle_login(self, login_url, username, password,
username_field='username', password_field='password'):
"""处理登录"""
try:
# 获取登录页面
response = self.session.get(login_url)
# 这里可以解析页面获取CSRF token等
# soup = BeautifulSoup(response.text, 'html.parser')
# csrf_token = soup.find('input', {'name': 'csrf_token'})['value']
# 准备登录数据
login_data = {
username_field: username,
password_field: password
}
# 发送登录请求
response = self.session.post(login_url, data=login_data)
# 检查登录是否成功
if self.is_login_successful(response):
print("登录成功")
self.save_cookies()
return True
else:
print("登录失败")
return False
except Exception as e:
print(f"登录过程出错:{e}")
return False
def is_login_successful(self, response):
"""检查登录是否成功"""
# 检查状态码
if response.status_code != 200:
return False
# 检查URL重定向
if 'login' in response.url.lower():
return False
# 检查页面内容
content = response.text.lower()
success_indicators = ['dashboard', 'profile', 'logout', '退出', '个人中心']
failure_indicators = ['error', 'invalid', 'incorrect', '错误', '失败']
has_success = any(indicator in content for indicator in success_indicators)
has_failure = any(indicator in content for indicator in failure_indicators)
return has_success and not has_failure
def get_session_info(self):
"""获取会话信息"""
return {
'cookies_count': len(self.session.cookies),
'cookies': dict(self.session.cookies),
'headers': dict(self.session.headers),
'request_count': len(self.request_history)
}
# 使用示例
session_manager = SessionManager(cookie_file='session_cookies.txt')
# 模拟浏览器行为访问网站
response = session_manager.simulate_browser_behavior('https://example.com/target-page')
if response:
print(f"访问成功,状态码:{response.status_code}")
# 查看会话信息
info = session_manager.get_session_info()
print(f"Cookie数量:{info['cookies_count']}")
print(f"请求次数:{info['request_count']}")
# 保存Cookie
session_manager.save_cookies()
5. 实战案例:电商网站商品爬虫
import asyncio
import aiohttp
import json
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging
@dataclass
class Product:
title: str
price: str
url: str
image_url: str = ""
rating: str = ""
reviews_count: str = ""
seller: str = ""
class EcommerceSpider:
def __init__(self, proxy_pool=None, max_concurrent=10):
self.proxy_pool = proxy_pool
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
# 用户代理轮换
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
]
# 统计信息
self.stats = {
'pages_crawled': 0,
'products_found': 0,
'errors': 0,
'proxy_failures': 0
}
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def get_session(self):
"""创建HTTP会话"""
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=self.max_concurrent,
ttl_dns_cache=300,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(total=30)
return aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
def get_headers(self):
"""获取随机请求头"""
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,zh-CN,zh;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
async def fetch_with_retry(self, session, url, max_retries=3):
"""带重试的请求"""
async with self.semaphore:
for attempt in range(max_retries):
proxy = None
# 获取代理
if self.proxy_pool:
proxy_info = await self.proxy_pool.get_best_proxy()
proxy = proxy_info.url if proxy_info else None
try:
# 随机延迟
await asyncio.sleep(random.uniform(1, 3))
headers = self.get_headers()
async with session.get(
url,
headers=headers,
proxy=proxy
) as response:
if response.status == 200:
content = await response.text()
self.stats['pages_crawled'] += 1
# 标记代理成功
if proxy and self.proxy_pool:
proxy_info.success_count += 1
return content
elif response.status in [403, 429, 503]:
# 可能被反爬虫阻止
self.logger.warning(f"请求被阻止:{url} - 状态码:{response.status}")
if proxy and self.proxy_pool:
proxy_info.fail_count += 1
self.stats['proxy_failures'] += 1
if attempt < max_retries - 1:
await asyncio.sleep(random.uniform(5, 10))
continue
else:
self.logger.error(f"HTTP错误:{url} - 状态码:{response.status}")
except Exception as e:
self.logger.error(f"请求失败:{url} - {e}")
if proxy and self.proxy_pool:
proxy_info.fail_count += 1
self.stats['proxy_failures'] += 1
if attempt < max_retries - 1:
await asyncio.sleep(random.uniform(2, 5))
continue
self.stats['errors'] += 1
return None
def parse_product_list(self, html, base_url):
"""解析商品列表页面"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
products = []
# 根据实际网站结构调整选择器
product_items = soup.select('.product-item') # 示例选择器
for item in product_items:
try:
# 商品标题
title_elem = item.select_one('.product-title')
title = title_elem.get_text(strip=True) if title_elem else ""
# 商品价格
price_elem = item.select_one('.product-price')
price = price_elem.get_text(strip=True) if price_elem else ""
# 商品链接
link_elem = item.select_one('a')
url = link_elem.get('href') if link_elem else ""
if url and not url.startswith('http'):
url = base_url + url
# 商品图片
img_elem = item.select_one('img')
image_url = img_elem.get('src') if img_elem else ""
# 评分
rating_elem = item.select_one('.rating')
rating = rating_elem.get_text(strip=True) if rating_elem else ""
# 评论数
reviews_elem = item.select_one('.reviews-count')
reviews_count = reviews_elem.get_text(strip=True) if reviews_elem else ""
if title and price and url:
product = Product(
title=title,
price=price,
url=url,
image_url=image_url,
rating=rating,
reviews_count=reviews_count
)
products.append(product)
except Exception as e:
self.logger.error(f"解析商品失败:{e}")
continue
self.stats['products_found'] += len(products)
return products
async def crawl_search_results(self, keyword, max_pages=5):
"""爬取搜索结果"""
all_products = []
base_url = "https://example-shop.com" # 替换为实际网站
async with await self.get_session() as session:
tasks = []
# 创建所有页面的任务
for page in range(1, max_pages + 1):
search_url = f"{base_url}/search?q={keyword}&page={page}"
task = self.crawl_single_page(session, search_url, base_url)
tasks.append(task)
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集结果
for result in results:
if isinstance(result, list):
all_products.extend(result)
elif isinstance(result, Exception):
self.logger.error(f"页面爬取失败:{result}")
return all_products
async def crawl_single_page(self, session, url, base_url):
"""爬取单个页面"""
html = await self.fetch_with_retry(session, url)
if html:
products = self.parse_product_list(html, base_url)
self.logger.info(f"页面 {url} 找到 {len(products)} 个商品")
return products
else:
self.logger.error(f"无法获取页面:{url}")
return []
async def save_products(self, products, filename='products.json'):
"""保存商品数据"""
data = []
for product in products:
data.append({
'title': product.title,
'price': product.price,
'url': product.url,
'image_url': product.image_url,
'rating': product.rating,
'reviews_count': product.reviews_count,
'seller': product.seller
})
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self.logger.info(f"已保存 {len(products)} 个商品到 {filename}")
def print_stats(self):
"""打印统计信息"""
self.logger.info("=== 爬取统计 ===")
self.logger.info(f"页面爬取:{self.stats['pages_crawled']}")
self.logger.info(f"商品发现:{self.stats['products_found']}")
self.logger.info(f"错误次数:{self.stats['errors']}")
self.logger.info(f"代理失败:{self.stats['proxy_failures']}")
# 使用示例
async def main():
# 创建代理池(可选)
proxy_pool = AdvancedProxyPool()
# 添加一些代理
proxy_list = [
'http://proxy1:port',
'http://proxy2:port',
'http://proxy3:port'
]
for proxy in proxy_list:
await proxy_pool.add_proxy(proxy)
# 启动后台检查
await proxy_pool.start_background_check()
try:
# 创建爬虫
spider = EcommerceSpider(proxy_pool=proxy_pool, max_concurrent=5)
# 爬取商品
products = await spider.crawl_search_results('手机', max_pages=3)
# 保存结果
await spider.save_products(products)
# 打印统计
spider.print_stats()
finally:
await proxy_pool.stop_background_check()
# 运行爬虫
# asyncio.run(main())
6. 练习作业
6.1 基础练习
- 代理验证器:编写一个代理验证器,能够检测代理的可用性、匿名性和响应时间
- 用户代理轮换:实现一个用户代理轮换器,支持桌面和移动端UA
- 请求头优化:创建一个请求头生成器,能够生成真实的浏览器请求头
6.2 进阶练习
- 代理池管理:构建一个完整的代理池系统,包括代理获取、验证、轮换和失效处理
- 反反爬虫系统:设计一个综合的反反爬虫系统,集成代理、UA轮换、请求间隔等功能
- 会话管理:实现一个智能会话管理器,能够处理登录、Cookie管理和会话保持
6.3 实战项目
选择一个电商网站,实现以下功能:
- 使用代理池爬取商品信息
- 实现反反爬虫检测和应对
- 处理动态加载内容
- 数据清洗和存储
7. 常见问题解答
Q1: 如何判断代理是否被检测?
A: 可以通过以下方式判断:
- 检查响应状态码(403、429、503等)
- 分析响应内容是否包含验证码或阻止信息
- 监控响应时间异常
- 检查返回的IP地址是否为代理IP
Q2: 免费代理不稳定怎么办?
A: 建议:
- 建立大量代理池,实现快速轮换
- 实时检测代理可用性
- 使用多个代理源
- 考虑使用付费代理服务
Q3: 如何避免被网站封IP?
A: 采取以下措施:
- 控制请求频率
- 使用代理轮换
- 模拟真实用户行为
- 随机化请求参数
- 处理验证码和登录
8. 下节预告
下一课我们将学习《Python爬虫第9课:验证码识别与自动化处理》,内容包括:
- 验证码类型和识别技术
- OCR文字识别
- 图像验证码处理
- 滑块验证码破解
- 第三方验证码服务
通过本课的学习,你已经掌握了代理池构建和反反爬虫的核心技术。这些技能将帮助你应对各种反爬虫挑战,提高爬虫的稳定性和成功率。
希望对初学者有帮助;致力于办公自动化的小小程序员一枚
希望能得到大家的【❤️一个免费关注❤️】感谢!
求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏
