200字
Python爬虫第8课:代理池与反反爬虫技术
2025-10-23
2025-10-23
  • 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双
  • 💻 个人主页——>个人主页欢迎访问
  • 😸 Github主页——>Github主页欢迎访问
  • ❓ 知乎主页——>知乎主页欢迎访问
  • 🏳️‍🌈 CSDN博客主页:请点击——> 一晌小贪欢的博客主页求关注
  • 👍 该系列文章专栏:请点击——>Python办公自动化专栏求订阅
  • 🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏求订阅
  • 📕 此外还有python基础专栏:请点击——>Python基础学习专栏求订阅
  • 文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
  • ❤️ 欢迎各位佬关注! ❤️

课程目标

  • 理解代理的工作原理和类型
  • 掌握代理池的构建和管理
  • 学会IP轮换和反反爬虫技术
  • 了解用户代理伪装和请求头优化

1. 代理基础知识

1.1 什么是代理?

代理服务器是客户端和目标服务器之间的中介,可以隐藏客户端的真实IP地址。

import requests

# 不使用代理的请求
response = requests.get('https://httpbin.org/ip')
print("真实IP:", response.json())

# 使用代理的请求
proxies = {
    'http': 'http://proxy-server:port',
    'https': 'https://proxy-server:port'
}

response = requests.get('https://httpbin.org/ip', proxies=proxies)
print("代理IP:", response.json())

1.2 代理类型

# HTTP代理
http_proxy = {
    'http': 'http://username:password@proxy-server:port',
    'https': 'http://username:password@proxy-server:port'
}

# HTTPS代理
https_proxy = {
    'http': 'https://username:password@proxy-server:port',
    'https': 'https://username:password@proxy-server:port'
}

# SOCKS代理
socks_proxy = {
    'http': 'socks5://username:password@proxy-server:port',
    'https': 'socks5://username:password@proxy-server:port'
}

1.3 代理验证

import requests
import time
from urllib.parse import urlparse

class ProxyValidator:
    def __init__(self, timeout=10):
        self.timeout = timeout
        self.test_urls = [
            'https://httpbin.org/ip',
            'https://httpbin.org/get',
            'https://www.google.com'
        ]
    
    def validate_proxy(self, proxy_url):
        """验证代理是否可用"""
        proxies = {
            'http': proxy_url,
            'https': proxy_url
        }
        
        try:
            start_time = time.time()
            response = requests.get(
                self.test_urls[0], 
                proxies=proxies, 
                timeout=self.timeout
            )
            end_time = time.time()
            
            if response.status_code == 200:
                response_data = response.json()
                return {
                    'proxy': proxy_url,
                    'status': 'valid',
                    'ip': response_data.get('origin', ''),
                    'response_time': end_time - start_time,
                    'anonymity': self.check_anonymity(response_data)
                }
            else:
                return {
                    'proxy': proxy_url,
                    'status': 'invalid',
                    'error': f'HTTP {response.status_code}'
                }
                
        except Exception as e:
            return {
                'proxy': proxy_url,
                'status': 'invalid',
                'error': str(e)
            }
    
    def check_anonymity(self, response_data):
        """检查代理匿名性"""
        headers = response_data.get('headers', {})
        
        # 检查是否暴露真实IP
        forwarded_headers = [
            'X-Forwarded-For',
            'X-Real-Ip',
            'X-Originating-Ip',
            'Client-Ip'
        ]
        
        for header in forwarded_headers:
            if header in headers:
                return 'transparent'  # 透明代理
        
        # 检查是否暴露代理信息
        proxy_headers = [
            'Via',
            'X-Proxy-Id',
            'Proxy-Connection'
        ]
        
        for header in proxy_headers:
            if header in headers:
                return 'anonymous'  # 匿名代理
        
        return 'elite'  # 高匿代理

# 使用示例
validator = ProxyValidator()
result = validator.validate_proxy('http://proxy-server:port')
print(result)

2. 代理池构建

2.1 基础代理池

import requests
import random
import time
import threading
from queue import Queue, Empty
import json

class ProxyPool:
    def __init__(self, max_size=100):
        self.max_size = max_size
        self.proxies = Queue(maxsize=max_size)
        self.bad_proxies = set()
        self.lock = threading.Lock()
        
        # 统计信息
        self.stats = {
            'total_proxies': 0,
            'valid_proxies': 0,
            'failed_proxies': 0,
            'requests_made': 0
        }
    
    def add_proxy(self, proxy_url):
        """添加代理到池中"""
        if proxy_url not in self.bad_proxies:
            try:
                self.proxies.put_nowait(proxy_url)
                with self.lock:
                    self.stats['total_proxies'] += 1
                return True
            except:
                return False
        return False
    
    def get_proxy(self):
        """从池中获取代理"""
        try:
            proxy = self.proxies.get_nowait()
            return proxy
        except Empty:
            return None
    
    def return_proxy(self, proxy_url, success=True):
        """归还代理到池中"""
        if success:
            self.add_proxy(proxy_url)
            with self.lock:
                self.stats['valid_proxies'] += 1
        else:
            self.bad_proxies.add(proxy_url)
            with self.lock:
                self.stats['failed_proxies'] += 1
    
    def make_request(self, url, **kwargs):
        """使用代理池发起请求"""
        max_retries = 3
        
        for attempt in range(max_retries):
            proxy = self.get_proxy()
            if not proxy:
                raise Exception("代理池为空")
            
            proxies = {
                'http': proxy,
                'https': proxy
            }
            
            try:
                response = requests.get(url, proxies=proxies, **kwargs)
                self.return_proxy(proxy, success=True)
                
                with self.lock:
                    self.stats['requests_made'] += 1
                
                return response
                
            except Exception as e:
                self.return_proxy(proxy, success=False)
                if attempt == max_retries - 1:
                    raise e
                continue
    
    def get_stats(self):
        """获取统计信息"""
        with self.lock:
            return self.stats.copy()
    
    def size(self):
        """获取代理池大小"""
        return self.proxies.qsize()

# 使用示例
pool = ProxyPool()

# 添加代理
proxy_list = [
    'http://proxy1:port',
    'http://proxy2:port',
    'http://proxy3:port'
]

for proxy in proxy_list:
    pool.add_proxy(proxy)

# 使用代理池发起请求
try:
    response = pool.make_request('https://httpbin.org/ip', timeout=10)
    print(response.json())
except Exception as e:
    print(f"请求失败:{e}")

print("代理池统计:", pool.get_stats())

2.2 高级代理池

import asyncio
import aiohttp
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging

@dataclass
class ProxyInfo:
    url: str
    ip: str = ""
    port: int = 0
    protocol: str = "http"
    anonymity: str = "unknown"
    response_time: float = 0.0
    success_count: int = 0
    fail_count: int = 0
    last_used: float = 0.0
    last_checked: float = 0.0
    
    @property
    def success_rate(self):
        total = self.success_count + self.fail_count
        return self.success_count / total if total > 0 else 0.0
    
    @property
    def score(self):
        """代理评分(响应时间越短、成功率越高,分数越高)"""
        if self.response_time == 0:
            return 0
        
        time_score = 1.0 / (self.response_time + 0.1)
        rate_score = self.success_rate
        
        return time_score * rate_score

class AdvancedProxyPool:
    def __init__(self, max_size=200, check_interval=300):
        self.max_size = max_size
        self.check_interval = check_interval
        self.proxies = {}  # url -> ProxyInfo
        self.lock = asyncio.Lock()
        
        # 配置日志
        self.logger = logging.getLogger(__name__)
        
        # 启动后台检查任务
        self.check_task = None
    
    async def add_proxy(self, proxy_url):
        """添加代理"""
        async with self.lock:
            if proxy_url not in self.proxies:
                proxy_info = ProxyInfo(url=proxy_url)
                
                # 解析代理信息
                if '://' in proxy_url:
                    protocol, address = proxy_url.split('://', 1)
                    proxy_info.protocol = protocol
                    
                    if '@' in address:
                        auth, address = address.split('@', 1)
                    
                    if ':' in address:
                        ip, port = address.split(':', 1)
                        proxy_info.ip = ip
                        proxy_info.port = int(port)
                
                self.proxies[proxy_url] = proxy_info
                self.logger.info(f"添加代理:{proxy_url}")
                
                return True
            return False
    
    async def validate_proxy(self, proxy_info: ProxyInfo):
        """验证代理"""
        test_url = 'https://httpbin.org/ip'
        
        try:
            connector = aiohttp.TCPConnector(
                limit=10,
                ttl_dns_cache=300,
                use_dns_cache=True
            )
            
            timeout = aiohttp.ClientTimeout(total=15)
            
            async with aiohttp.ClientSession(
                connector=connector,
                timeout=timeout
            ) as session:
                
                start_time = time.time()
                
                async with session.get(
                    test_url,
                    proxy=proxy_info.url
                ) as response:
                    
                    end_time = time.time()
                    proxy_info.response_time = end_time - start_time
                    
                    if response.status == 200:
                        data = await response.json()
                        proxy_info.success_count += 1
                        proxy_info.last_checked = time.time()
                        
                        # 检查匿名性
                        headers = data.get('headers', {})
                        proxy_info.anonymity = self.check_anonymity(headers)
                        
                        return True
                    else:
                        proxy_info.fail_count += 1
                        return False
                        
        except Exception as e:
            proxy_info.fail_count += 1
            self.logger.debug(f"代理验证失败:{proxy_info.url} - {e}")
            return False
    
    def check_anonymity(self, headers):
        """检查代理匿名性"""
        forwarded_headers = [
            'X-Forwarded-For', 'X-Real-Ip', 
            'X-Originating-Ip', 'Client-Ip'
        ]
        
        proxy_headers = [
            'Via', 'X-Proxy-Id', 'Proxy-Connection'
        ]
        
        for header in forwarded_headers:
            if header in headers:
                return 'transparent'
        
        for header in proxy_headers:
            if header in headers:
                return 'anonymous'
        
        return 'elite'
    
    async def get_best_proxy(self, min_score=0.1):
        """获取最佳代理"""
        async with self.lock:
            valid_proxies = [
                proxy for proxy in self.proxies.values()
                if proxy.success_rate > 0.5 and proxy.score >= min_score
            ]
            
            if not valid_proxies:
                return None
            
            # 按评分排序,选择最佳代理
            valid_proxies.sort(key=lambda x: x.score, reverse=True)
            
            # 随机选择前10%的代理,避免总是使用同一个
            top_count = max(1, len(valid_proxies) // 10)
            selected = random.choice(valid_proxies[:top_count])
            
            selected.last_used = time.time()
            return selected
    
    async def make_request(self, url, **kwargs):
        """使用代理发起请求"""
        max_retries = 3
        
        for attempt in range(max_retries):
            proxy_info = await self.get_best_proxy()
            
            if not proxy_info:
                raise Exception("没有可用的代理")
            
            try:
                connector = aiohttp.TCPConnector(
                    limit=50,
                    ttl_dns_cache=300,
                    use_dns_cache=True
                )
                
                timeout = aiohttp.ClientTimeout(total=30)
                
                async with aiohttp.ClientSession(
                    connector=connector,
                    timeout=timeout
                ) as session:
                    
                    async with session.get(
                        url,
                        proxy=proxy_info.url,
                        **kwargs
                    ) as response:
                        
                        proxy_info.success_count += 1
                        return response
                        
            except Exception as e:
                proxy_info.fail_count += 1
                self.logger.warning(f"请求失败:{proxy_info.url} - {e}")
                
                if attempt == max_retries - 1:
                    raise e
                
                continue
    
    async def check_all_proxies(self):
        """检查所有代理"""
        self.logger.info("开始检查所有代理...")
        
        async with self.lock:
            proxy_list = list(self.proxies.values())
        
        # 并发检查代理
        semaphore = asyncio.Semaphore(20)
        
        async def check_proxy(proxy_info):
            async with semaphore:
                return await self.validate_proxy(proxy_info)
        
        tasks = [check_proxy(proxy) for proxy in proxy_list]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 清理失效代理
        async with self.lock:
            to_remove = []
            for proxy_info in proxy_list:
                if proxy_info.success_rate < 0.1 and proxy_info.fail_count > 5:
                    to_remove.append(proxy_info.url)
            
            for url in to_remove:
                del self.proxies[url]
                self.logger.info(f"移除失效代理:{url}")
        
        valid_count = sum(1 for r in results if r is True)
        self.logger.info(f"代理检查完成,有效代理:{valid_count}/{len(proxy_list)}")
    
    async def start_background_check(self):
        """启动后台检查任务"""
        async def check_loop():
            while True:
                try:
                    await self.check_all_proxies()
                    await asyncio.sleep(self.check_interval)
                except Exception as e:
                    self.logger.error(f"后台检查出错:{e}")
                    await asyncio.sleep(60)
        
        self.check_task = asyncio.create_task(check_loop())
    
    async def stop_background_check(self):
        """停止后台检查任务"""
        if self.check_task:
            self.check_task.cancel()
            try:
                await self.check_task
            except asyncio.CancelledError:
                pass
    
    async def get_stats(self):
        """获取统计信息"""
        async with self.lock:
            total = len(self.proxies)
            valid = sum(1 for p in self.proxies.values() if p.success_rate > 0.5)
            
            avg_response_time = 0
            if total > 0:
                total_time = sum(p.response_time for p in self.proxies.values())
                avg_response_time = total_time / total
            
            return {
                'total_proxies': total,
                'valid_proxies': valid,
                'avg_response_time': avg_response_time,
                'anonymity_distribution': self.get_anonymity_stats()
            }
    
    def get_anonymity_stats(self):
        """获取匿名性统计"""
        stats = {'elite': 0, 'anonymous': 0, 'transparent': 0, 'unknown': 0}
        
        for proxy in self.proxies.values():
            stats[proxy.anonymity] += 1
        
        return stats

# 使用示例
async def main():
    # 配置日志
    logging.basicConfig(level=logging.INFO)
    
    pool = AdvancedProxyPool()
    
    # 添加代理
    proxy_list = [
        'http://proxy1:port',
        'http://proxy2:port',
        'http://proxy3:port'
    ]
    
    for proxy in proxy_list:
        await pool.add_proxy(proxy)
    
    # 启动后台检查
    await pool.start_background_check()
    
    try:
        # 使用代理池
        response = await pool.make_request('https://httpbin.org/ip')
        print(await response.json())
        
        # 获取统计信息
        stats = await pool.get_stats()
        print("代理池统计:", stats)
        
    finally:
        await pool.stop_background_check()

# 运行示例
# asyncio.run(main())

3. 免费代理获取

3.1 代理网站爬取

import requests
from bs4 import BeautifulSoup
import re
import time
import random

class FreeProxyCollector:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        self.proxy_sources = [
            {
                'name': 'free-proxy-list',
                'url': 'https://free-proxy-list.net/',
                'parser': self.parse_free_proxy_list
            },
            {
                'name': 'proxy-list',
                'url': 'https://www.proxy-list.download/api/v1/get?type=http',
                'parser': self.parse_proxy_list_download
            }
        ]
    
    def parse_free_proxy_list(self, html):
        """解析 free-proxy-list.net"""
        soup = BeautifulSoup(html, 'html.parser')
        proxies = []
        
        table = soup.find('table', {'id': 'proxylisttable'})
        if table:
            rows = table.find_all('tr')[1:]  # 跳过表头
            
            for row in rows:
                cols = row.find_all('td')
                if len(cols) >= 7:
                    ip = cols[0].text.strip()
                    port = cols[1].text.strip()
                    country = cols[2].text.strip()
                    anonymity = cols[4].text.strip()
                    https = cols[6].text.strip()
                    
                    protocol = 'https' if https == 'yes' else 'http'
                    proxy_url = f"{protocol}://{ip}:{port}"
                    
                    proxies.append({
                        'url': proxy_url,
                        'ip': ip,
                        'port': int(port),
                        'country': country,
                        'anonymity': anonymity.lower(),
                        'protocol': protocol
                    })
        
        return proxies
    
    def parse_proxy_list_download(self, content):
        """解析 proxy-list.download API"""
        proxies = []
        lines = content.strip().split('\n')
        
        for line in lines:
            if ':' in line:
                ip, port = line.strip().split(':', 1)
                proxy_url = f"http://{ip}:{port}"
                
                proxies.append({
                    'url': proxy_url,
                    'ip': ip,
                    'port': int(port),
                    'country': 'unknown',
                    'anonymity': 'unknown',
                    'protocol': 'http'
                })
        
        return proxies
    
    def collect_from_source(self, source):
        """从单个源收集代理"""
        try:
            print(f"正在收集代理:{source['name']}")
            
            response = self.session.get(source['url'], timeout=15)
            response.raise_for_status()
            
            if response.headers.get('content-type', '').startswith('application/json'):
                proxies = source['parser'](response.text)
            else:
                proxies = source['parser'](response.text)
            
            print(f"从 {source['name']} 收集到 {len(proxies)} 个代理")
            return proxies
            
        except Exception as e:
            print(f"收集代理失败:{source['name']} - {e}")
            return []
    
    def collect_all_proxies(self):
        """收集所有源的代理"""
        all_proxies = []
        
        for source in self.proxy_sources:
            proxies = self.collect_from_source(source)
            all_proxies.extend(proxies)
            
            # 随机延迟,避免被封
            time.sleep(random.uniform(1, 3))
        
        # 去重
        unique_proxies = {}
        for proxy in all_proxies:
            unique_proxies[proxy['url']] = proxy
        
        result = list(unique_proxies.values())
        print(f"总共收集到 {len(result)} 个唯一代理")
        
        return result
    
    def validate_proxies(self, proxies, max_workers=20):
        """验证代理列表"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        validator = ProxyValidator(timeout=10)
        valid_proxies = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_proxy = {
                executor.submit(validator.validate_proxy, proxy['url']): proxy
                for proxy in proxies
            }
            
            for future in as_completed(future_to_proxy):
                proxy = future_to_proxy[future]
                try:
                    result = future.result()
                    if result['status'] == 'valid':
                        valid_proxies.append(result)
                        print(f"✓ 有效代理:{result['proxy']} - {result['response_time']:.2f}s")
                    else:
                        print(f"✗ 无效代理:{result['proxy']}")
                except Exception as e:
                    print(f"✗ 验证出错:{proxy['url']} - {e}")
        
        return valid_proxies

# 使用示例
collector = FreeProxyCollector()

# 收集代理
proxies = collector.collect_all_proxies()

# 验证代理
valid_proxies = collector.validate_proxies(proxies[:50])  # 只验证前50个

print(f"验证完成,有效代理:{len(valid_proxies)} 个")

# 保存有效代理
import json
with open('valid_proxies.json', 'w') as f:
    json.dump(valid_proxies, f, indent=2)

3.2 代理API集成

import requests
import json
import time

class ProxyAPI:
    def __init__(self):
        self.apis = {
            'proxylist': {
                'url': 'https://www.proxy-list.download/api/v1/get',
                'params': {'type': 'http'},
                'parser': self.parse_text_list
            },
            'gimmeproxy': {
                'url': 'https://gimmeproxy.com/api/getProxy',
                'params': {'format': 'json', 'protocol': 'http'},
                'parser': self.parse_gimmeproxy
            }
        }
    
    def parse_text_list(self, content):
        """解析文本格式的代理列表"""
        proxies = []
        lines = content.strip().split('\n')
        
        for line in lines:
            if ':' in line:
                ip, port = line.strip().split(':', 1)
                proxies.append(f"http://{ip}:{port}")
        
        return proxies
    
    def parse_gimmeproxy(self, content):
        """解析 gimmeproxy API 响应"""
        try:
            data = json.loads(content)
            proxy_url = f"{data['protocol']}://{data['ip']}:{data['port']}"
            return [proxy_url]
        except:
            return []
    
    def get_proxies_from_api(self, api_name, count=10):
        """从指定API获取代理"""
        if api_name not in self.apis:
            raise ValueError(f"未知的API:{api_name}")
        
        api_config = self.apis[api_name]
        proxies = []
        
        for i in range(count):
            try:
                response = requests.get(
                    api_config['url'],
                    params=api_config['params'],
                    timeout=10
                )
                
                if response.status_code == 200:
                    new_proxies = api_config['parser'](response.text)
                    proxies.extend(new_proxies)
                    
                    if api_name == 'gimmeproxy':
                        time.sleep(1)  # gimmeproxy 有速率限制
                
            except Exception as e:
                print(f"API请求失败:{api_name} - {e}")
                break
        
        return list(set(proxies))  # 去重
    
    def get_all_proxies(self, count_per_api=20):
        """从所有API获取代理"""
        all_proxies = []
        
        for api_name in self.apis:
            print(f"正在从 {api_name} 获取代理...")
            proxies = self.get_proxies_from_api(api_name, count_per_api)
            all_proxies.extend(proxies)
            print(f"从 {api_name} 获取到 {len(proxies)} 个代理")
        
        return list(set(all_proxies))

# 使用示例
api = ProxyAPI()
proxies = api.get_all_proxies(count_per_api=10)
print(f"总共获取到 {len(proxies)} 个代理")

4. 反反爬虫技术

4.1 用户代理轮换

import random
import requests
import time

class UserAgentRotator:
    def __init__(self):
        self.user_agents = [
            # Chrome
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            
            # Firefox
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
            
            # Safari
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Safari/605.1.15',
            
            # Edge
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36 Edg/90.0.818.66'
        ]
        
        self.mobile_agents = [
            # Mobile Chrome
            'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
            'Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0',
            'Mozilla/5.0 (Linux; Android 11; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36'
        ]
    
    def get_random_desktop_ua(self):
        """获取随机桌面用户代理"""
        return random.choice(self.user_agents)
    
    def get_random_mobile_ua(self):
        """获取随机移动用户代理"""
        return random.choice(self.mobile_agents)
    
    def get_random_ua(self, mobile_ratio=0.3):
        """获取随机用户代理(可指定移动端比例)"""
        if random.random() < mobile_ratio:
            return self.get_random_mobile_ua()
        else:
            return self.get_random_desktop_ua()

class AntiDetectionSpider:
    def __init__(self, proxy_pool=None):
        self.ua_rotator = UserAgentRotator()
        self.proxy_pool = proxy_pool
        self.session = requests.Session()
        
        # 请求间隔配置
        self.min_delay = 1
        self.max_delay = 3
        self.last_request_time = 0
    
    def get_headers(self, url=None):
        """生成随机请求头"""
        headers = {
            'User-Agent': self.ua_rotator.get_random_ua(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': random.choice([
                'en-US,en;q=0.9',
                'zh-CN,zh;q=0.9,en;q=0.8',
                'ja-JP,ja;q=0.9,en;q=0.8'
            ]),
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
        
        # 随机添加一些可选头
        optional_headers = {
            'Cache-Control': random.choice(['no-cache', 'max-age=0']),
            'Pragma': 'no-cache',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1'
        }
        
        # 随机添加部分可选头
        for key, value in optional_headers.items():
            if random.random() > 0.5:
                headers[key] = value
        
        # 如果有referer,添加referer头
        if url and random.random() > 0.3:
            headers['Referer'] = url
        
        return headers
    
    def wait_between_requests(self):
        """请求间隔控制"""
        current_time = time.time()
        elapsed = current_time - self.last_request_time
        
        min_interval = random.uniform(self.min_delay, self.max_delay)
        
        if elapsed < min_interval:
            sleep_time = min_interval - elapsed
            time.sleep(sleep_time)
        
        self.last_request_time = time.time()
    
    def make_request(self, url, **kwargs):
        """发起请求"""
        self.wait_between_requests()
        
        # 设置请求头
        headers = self.get_headers(url)
        kwargs.setdefault('headers', {}).update(headers)
        
        # 设置代理
        if self.proxy_pool:
            proxy = self.proxy_pool.get_proxy()
            if proxy:
                kwargs['proxies'] = {
                    'http': proxy,
                    'https': proxy
                }
        
        # 设置超时
        kwargs.setdefault('timeout', 15)
        
        try:
            response = self.session.get(url, **kwargs)
            
            # 检查是否被反爬虫检测
            if self.is_blocked(response):
                raise Exception("请求被阻止,可能触发了反爬虫机制")
            
            return response
            
        except Exception as e:
            # 如果使用了代理且失败,标记代理为失效
            if self.proxy_pool and 'proxies' in kwargs:
                self.proxy_pool.return_proxy(
                    kwargs['proxies']['http'], 
                    success=False
                )
            raise e
    
    def is_blocked(self, response):
        """检测是否被反爬虫阻止"""
        # 检查状态码
        if response.status_code in [403, 429, 503]:
            return True
        
        # 检查响应内容
        content = response.text.lower()
        blocked_keywords = [
            'access denied',
            'blocked',
            'captcha',
            'verification',
            'robot',
            'bot detection',
            '访问被拒绝',
            '验证码',
            '机器人检测'
        ]
        
        for keyword in blocked_keywords:
            if keyword in content:
                return True
        
        # 检查响应大小(异常小的响应可能是阻止页面)
        if len(content) < 100:
            return True
        
        return False

# 使用示例
spider = AntiDetectionSpider()

try:
    response = spider.make_request('https://httpbin.org/headers')
    print("请求成功")
    print("响应头:", response.json())
except Exception as e:
    print(f"请求失败:{e}")

4.2 请求头优化

import random
import time
from urllib.parse import urlparse

class HeaderOptimizer:
    def __init__(self):
        self.browser_versions = {
            'chrome': ['91.0.4472.124', '90.0.4430.212', '89.0.4389.128'],
            'firefox': ['89.0', '88.0', '87.0'],
            'safari': ['14.1.1', '14.0.3', '13.1.2'],
            'edge': ['91.0.864.59', '90.0.818.66', '89.0.774.68']
        }
        
        self.os_versions = {
            'windows': ['Windows NT 10.0; Win64; x64', 'Windows NT 6.1; Win64; x64'],
            'macos': ['Macintosh; Intel Mac OS X 10_15_7', 'Macintosh; Intel Mac OS X 10_14_6'],
            'linux': ['X11; Linux x86_64', 'X11; Ubuntu; Linux x86_64']
        }
    
    def generate_realistic_headers(self, target_url=None):
        """生成真实的浏览器请求头"""
        # 选择浏览器和操作系统
        browser = random.choice(['chrome', 'firefox', 'safari', 'edge'])
        os_type = random.choice(['windows', 'macos', 'linux'])
        
        # 生成User-Agent
        user_agent = self.generate_user_agent(browser, os_type)
        
        # 基础请求头
        headers = {
            'User-Agent': user_agent,
            'Accept': self.get_accept_header(browser),
            'Accept-Language': self.get_accept_language(),
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1'
        }
        
        # 根据浏览器添加特定头
        if browser == 'chrome':
            headers.update({
                'sec-ch-ua': f'"Chromium";v="{self.browser_versions[browser][0].split(".")[0]}", "Google Chrome";v="{self.browser_versions[browser][0].split(".")[0]}", ";Not A Brand";v="99"',
                'sec-ch-ua-mobile': '?0',
                'sec-ch-ua-platform': f'"{os_type.title()}"'
            })
        
        # 添加缓存控制
        if random.random() > 0.5:
            headers['Cache-Control'] = random.choice(['no-cache', 'max-age=0'])
            headers['Pragma'] = 'no-cache'
        
        # 添加DNT头
        if random.random() > 0.3:
            headers['DNT'] = '1'
        
        # 如果有目标URL,添加相关头
        if target_url:
            parsed = urlparse(target_url)
            
            # 添加Host头
            headers['Host'] = parsed.netloc
            
            # 随机添加Referer
            if random.random() > 0.4:
                referers = [
                    'https://www.google.com/',
                    'https://www.bing.com/',
                    'https://www.baidu.com/',
                    f'https://{parsed.netloc}/'
                ]
                headers['Referer'] = random.choice(referers)
        
        return headers
    
    def generate_user_agent(self, browser, os_type):
        """生成用户代理字符串"""
        browser_version = random.choice(self.browser_versions[browser])
        os_version = random.choice(self.os_versions[os_type])
        
        if browser == 'chrome':
            webkit_version = '537.36'
            return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{browser_version} Safari/{webkit_version}'
        
        elif browser == 'firefox':
            return f'Mozilla/5.0 ({os_version}; rv:{browser_version}) Gecko/20100101 Firefox/{browser_version}'
        
        elif browser == 'safari':
            webkit_version = '605.1.15'
            return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Version/{browser_version} Safari/{webkit_version}'
        
        elif browser == 'edge':
            webkit_version = '537.36'
            chrome_version = '91.0.4472.124'  # Edge基于Chromium
            return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{webkit_version} Edg/{browser_version}'
    
    def get_accept_header(self, browser):
        """获取Accept头"""
        accept_headers = {
            'chrome': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'firefox': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'safari': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'edge': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
        }
        
        return accept_headers.get(browser, accept_headers['chrome'])
    
    def get_accept_language(self):
        """获取Accept-Language头"""
        languages = [
            'en-US,en;q=0.9',
            'zh-CN,zh;q=0.9,en;q=0.8',
            'ja-JP,ja;q=0.9,en;q=0.8',
            'ko-KR,ko;q=0.9,en;q=0.8',
            'de-DE,de;q=0.9,en;q=0.8',
            'fr-FR,fr;q=0.9,en;q=0.8'
        ]
        
        return random.choice(languages)

# 使用示例
optimizer = HeaderOptimizer()

# 生成针对特定URL的请求头
headers = optimizer.generate_realistic_headers('https://example.com')
print("生成的请求头:")
for key, value in headers.items():
    print(f"{key}: {value}")

# 使用生成的请求头发起请求
import requests

response = requests.get('https://httpbin.org/headers', headers=headers)
print("\n服务器收到的请求头:")
print(response.json())

4.3 会话管理和Cookie处理

import requests
import json
import time
import random
from http.cookiejar import LWPCookieJar

class SessionManager:
    def __init__(self, cookie_file=None):
        self.session = requests.Session()
        self.cookie_file = cookie_file
        
        # 设置Cookie jar
        if cookie_file:
            self.session.cookies = LWPCookieJar(cookie_file)
            try:
                self.session.cookies.load(ignore_discard=True)
            except FileNotFoundError:
                pass
        
        # 会话配置
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        # 请求历史
        self.request_history = []
    
    def save_cookies(self):
        """保存Cookie到文件"""
        if self.cookie_file:
            self.session.cookies.save(ignore_discard=True)
    
    def load_cookies(self):
        """从文件加载Cookie"""
        if self.cookie_file:
            try:
                self.session.cookies.load(ignore_discard=True)
                return True
            except FileNotFoundError:
                return False
        return False
    
    def clear_cookies(self):
        """清除所有Cookie"""
        self.session.cookies.clear()
    
    def get_cookies_dict(self):
        """获取Cookie字典"""
        return dict(self.session.cookies)
    
    def set_cookies(self, cookies_dict):
        """设置Cookie"""
        for name, value in cookies_dict.items():
            self.session.cookies.set(name, value)
    
    def simulate_browser_behavior(self, url):
        """模拟真实浏览器行为"""
        # 1. 首先访问主页
        domain = '/'.join(url.split('/')[:3])
        
        try:
            # 访问主页
            response = self.session.get(domain, timeout=10)
            time.sleep(random.uniform(1, 3))
            
            # 模拟查看页面
            if random.random() > 0.5:
                # 随机访问一些常见页面
                common_pages = ['/about', '/contact', '/help', '/sitemap']
                page = random.choice(common_pages)
                try:
                    self.session.get(f"{domain}{page}", timeout=10)
                    time.sleep(random.uniform(0.5, 2))
                except:
                    pass
            
            # 最后访问目标页面
            response = self.session.get(url, timeout=10)
            
            # 记录请求历史
            self.request_history.append({
                'url': url,
                'timestamp': time.time(),
                'status_code': response.status_code
            })
            
            return response
            
        except Exception as e:
            print(f"模拟浏览器行为失败:{e}")
            return None
    
    def handle_login(self, login_url, username, password, 
                    username_field='username', password_field='password'):
        """处理登录"""
        try:
            # 获取登录页面
            response = self.session.get(login_url)
            
            # 这里可以解析页面获取CSRF token等
            # soup = BeautifulSoup(response.text, 'html.parser')
            # csrf_token = soup.find('input', {'name': 'csrf_token'})['value']
            
            # 准备登录数据
            login_data = {
                username_field: username,
                password_field: password
            }
            
            # 发送登录请求
            response = self.session.post(login_url, data=login_data)
            
            # 检查登录是否成功
            if self.is_login_successful(response):
                print("登录成功")
                self.save_cookies()
                return True
            else:
                print("登录失败")
                return False
                
        except Exception as e:
            print(f"登录过程出错:{e}")
            return False
    
    def is_login_successful(self, response):
        """检查登录是否成功"""
        # 检查状态码
        if response.status_code != 200:
            return False
        
        # 检查URL重定向
        if 'login' in response.url.lower():
            return False
        
        # 检查页面内容
        content = response.text.lower()
        success_indicators = ['dashboard', 'profile', 'logout', '退出', '个人中心']
        failure_indicators = ['error', 'invalid', 'incorrect', '错误', '失败']
        
        has_success = any(indicator in content for indicator in success_indicators)
        has_failure = any(indicator in content for indicator in failure_indicators)
        
        return has_success and not has_failure
    
    def get_session_info(self):
        """获取会话信息"""
        return {
            'cookies_count': len(self.session.cookies),
            'cookies': dict(self.session.cookies),
            'headers': dict(self.session.headers),
            'request_count': len(self.request_history)
        }

# 使用示例
session_manager = SessionManager(cookie_file='session_cookies.txt')

# 模拟浏览器行为访问网站
response = session_manager.simulate_browser_behavior('https://example.com/target-page')

if response:
    print(f"访问成功,状态码:{response.status_code}")
    
    # 查看会话信息
    info = session_manager.get_session_info()
    print(f"Cookie数量:{info['cookies_count']}")
    print(f"请求次数:{info['request_count']}")
    
    # 保存Cookie
    session_manager.save_cookies()

5. 实战案例:电商网站商品爬虫

import asyncio
import aiohttp
import json
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging

@dataclass
class Product:
    title: str
    price: str
    url: str
    image_url: str = ""
    rating: str = ""
    reviews_count: str = ""
    seller: str = ""

class EcommerceSpider:
    def __init__(self, proxy_pool=None, max_concurrent=10):
        self.proxy_pool = proxy_pool
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
        # 用户代理轮换
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
        ]
        
        # 统计信息
        self.stats = {
            'pages_crawled': 0,
            'products_found': 0,
            'errors': 0,
            'proxy_failures': 0
        }
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def get_session(self):
        """创建HTTP会话"""
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=self.max_concurrent,
            ttl_dns_cache=300,
            use_dns_cache=True
        )
        
        timeout = aiohttp.ClientTimeout(total=30)
        
        return aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
    
    def get_headers(self):
        """获取随机请求头"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.9,zh-CN,zh;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }
    
    async def fetch_with_retry(self, session, url, max_retries=3):
        """带重试的请求"""
        async with self.semaphore:
            for attempt in range(max_retries):
                proxy = None
                
                # 获取代理
                if self.proxy_pool:
                    proxy_info = await self.proxy_pool.get_best_proxy()
                    proxy = proxy_info.url if proxy_info else None
                
                try:
                    # 随机延迟
                    await asyncio.sleep(random.uniform(1, 3))
                    
                    headers = self.get_headers()
                    
                    async with session.get(
                        url, 
                        headers=headers,
                        proxy=proxy
                    ) as response:
                        
                        if response.status == 200:
                            content = await response.text()
                            self.stats['pages_crawled'] += 1
                            
                            # 标记代理成功
                            if proxy and self.proxy_pool:
                                proxy_info.success_count += 1
                            
                            return content
                        
                        elif response.status in [403, 429, 503]:
                            # 可能被反爬虫阻止
                            self.logger.warning(f"请求被阻止:{url} - 状态码:{response.status}")
                            
                            if proxy and self.proxy_pool:
                                proxy_info.fail_count += 1
                                self.stats['proxy_failures'] += 1
                            
                            if attempt < max_retries - 1:
                                await asyncio.sleep(random.uniform(5, 10))
                                continue
                        
                        else:
                            self.logger.error(f"HTTP错误:{url} - 状态码:{response.status}")
                
                except Exception as e:
                    self.logger.error(f"请求失败:{url} - {e}")
                    
                    if proxy and self.proxy_pool:
                        proxy_info.fail_count += 1
                        self.stats['proxy_failures'] += 1
                    
                    if attempt < max_retries - 1:
                        await asyncio.sleep(random.uniform(2, 5))
                        continue
            
            self.stats['errors'] += 1
            return None
    
    def parse_product_list(self, html, base_url):
        """解析商品列表页面"""
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(html, 'html.parser')
        products = []
        
        # 根据实际网站结构调整选择器
        product_items = soup.select('.product-item')  # 示例选择器
        
        for item in product_items:
            try:
                # 商品标题
                title_elem = item.select_one('.product-title')
                title = title_elem.get_text(strip=True) if title_elem else ""
                
                # 商品价格
                price_elem = item.select_one('.product-price')
                price = price_elem.get_text(strip=True) if price_elem else ""
                
                # 商品链接
                link_elem = item.select_one('a')
                url = link_elem.get('href') if link_elem else ""
                if url and not url.startswith('http'):
                    url = base_url + url
                
                # 商品图片
                img_elem = item.select_one('img')
                image_url = img_elem.get('src') if img_elem else ""
                
                # 评分
                rating_elem = item.select_one('.rating')
                rating = rating_elem.get_text(strip=True) if rating_elem else ""
                
                # 评论数
                reviews_elem = item.select_one('.reviews-count')
                reviews_count = reviews_elem.get_text(strip=True) if reviews_elem else ""
                
                if title and price and url:
                    product = Product(
                        title=title,
                        price=price,
                        url=url,
                        image_url=image_url,
                        rating=rating,
                        reviews_count=reviews_count
                    )
                    products.append(product)
                    
            except Exception as e:
                self.logger.error(f"解析商品失败:{e}")
                continue
        
        self.stats['products_found'] += len(products)
        return products
    
    async def crawl_search_results(self, keyword, max_pages=5):
        """爬取搜索结果"""
        all_products = []
        base_url = "https://example-shop.com"  # 替换为实际网站
        
        async with await self.get_session() as session:
            tasks = []
            
            # 创建所有页面的任务
            for page in range(1, max_pages + 1):
                search_url = f"{base_url}/search?q={keyword}&page={page}"
                task = self.crawl_single_page(session, search_url, base_url)
                tasks.append(task)
            
            # 并发执行所有任务
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 收集结果
            for result in results:
                if isinstance(result, list):
                    all_products.extend(result)
                elif isinstance(result, Exception):
                    self.logger.error(f"页面爬取失败:{result}")
        
        return all_products
    
    async def crawl_single_page(self, session, url, base_url):
        """爬取单个页面"""
        html = await self.fetch_with_retry(session, url)
        
        if html:
            products = self.parse_product_list(html, base_url)
            self.logger.info(f"页面 {url} 找到 {len(products)} 个商品")
            return products
        else:
            self.logger.error(f"无法获取页面:{url}")
            return []
    
    async def save_products(self, products, filename='products.json'):
        """保存商品数据"""
        data = []
        for product in products:
            data.append({
                'title': product.title,
                'price': product.price,
                'url': product.url,
                'image_url': product.image_url,
                'rating': product.rating,
                'reviews_count': product.reviews_count,
                'seller': product.seller
            })
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"已保存 {len(products)} 个商品到 {filename}")
    
    def print_stats(self):
        """打印统计信息"""
        self.logger.info("=== 爬取统计 ===")
        self.logger.info(f"页面爬取:{self.stats['pages_crawled']}")
        self.logger.info(f"商品发现:{self.stats['products_found']}")
        self.logger.info(f"错误次数:{self.stats['errors']}")
        self.logger.info(f"代理失败:{self.stats['proxy_failures']}")

# 使用示例
async def main():
    # 创建代理池(可选)
    proxy_pool = AdvancedProxyPool()
    
    # 添加一些代理
    proxy_list = [
        'http://proxy1:port',
        'http://proxy2:port',
        'http://proxy3:port'
    ]
    
    for proxy in proxy_list:
        await proxy_pool.add_proxy(proxy)
    
    # 启动后台检查
    await proxy_pool.start_background_check()
    
    try:
        # 创建爬虫
        spider = EcommerceSpider(proxy_pool=proxy_pool, max_concurrent=5)
        
        # 爬取商品
        products = await spider.crawl_search_results('手机', max_pages=3)
        
        # 保存结果
        await spider.save_products(products)
        
        # 打印统计
        spider.print_stats()
        
    finally:
        await proxy_pool.stop_background_check()

# 运行爬虫
# asyncio.run(main())

6. 练习作业

6.1 基础练习

  1. 代理验证器:编写一个代理验证器,能够检测代理的可用性、匿名性和响应时间
  2. 用户代理轮换:实现一个用户代理轮换器,支持桌面和移动端UA
  3. 请求头优化:创建一个请求头生成器,能够生成真实的浏览器请求头

6.2 进阶练习

  1. 代理池管理:构建一个完整的代理池系统,包括代理获取、验证、轮换和失效处理
  2. 反反爬虫系统:设计一个综合的反反爬虫系统,集成代理、UA轮换、请求间隔等功能
  3. 会话管理:实现一个智能会话管理器,能够处理登录、Cookie管理和会话保持

6.3 实战项目

选择一个电商网站,实现以下功能:

  1. 使用代理池爬取商品信息
  2. 实现反反爬虫检测和应对
  3. 处理动态加载内容
  4. 数据清洗和存储

7. 常见问题解答

Q1: 如何判断代理是否被检测?

A: 可以通过以下方式判断:

  • 检查响应状态码(403、429、503等)
  • 分析响应内容是否包含验证码或阻止信息
  • 监控响应时间异常
  • 检查返回的IP地址是否为代理IP

Q2: 免费代理不稳定怎么办?

A: 建议:

  • 建立大量代理池,实现快速轮换
  • 实时检测代理可用性
  • 使用多个代理源
  • 考虑使用付费代理服务

Q3: 如何避免被网站封IP?

A: 采取以下措施:

  • 控制请求频率
  • 使用代理轮换
  • 模拟真实用户行为
  • 随机化请求参数
  • 处理验证码和登录

8. 下节预告

下一课我们将学习《Python爬虫第9课:验证码识别与自动化处理》,内容包括:

  • 验证码类型和识别技术
  • OCR文字识别
  • 图像验证码处理
  • 滑块验证码破解
  • 第三方验证码服务

通过本课的学习,你已经掌握了代理池构建和反反爬虫的核心技术。这些技能将帮助你应对各种反爬虫挑战,提高爬虫的稳定性和成功率。


  • 希望对初学者有帮助;致力于办公自动化的小小程序员一枚
  • 希望能得到大家的【❤️一个免费关注❤️】感谢!
  • 求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
  • 此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
  • 此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
  • 此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏


image-giWK.png

Python爬虫第8课:代理池与反反爬虫技术
作者
一晌小贪欢
发表于
2025-10-23
License
CC BY-NC-SA 4.0

评论