200字
Python爬虫第7课:多线程与异步爬虫技术
2025-10-23
2025-10-23
  • 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双
  • 💻 个人主页——>个人主页欢迎访问
  • 😸 Github主页——>Github主页欢迎访问
  • ❓ 知乎主页——>知乎主页欢迎访问
  • 🏳️‍🌈 CSDN博客主页:请点击——> 一晌小贪欢的博客主页求关注
  • 👍 该系列文章专栏:请点击——>Python办公自动化专栏求订阅
  • 🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏求订阅
  • 📕 此外还有python基础专栏:请点击——>Python基础学习专栏求订阅
  • 文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
  • ❤️ 欢迎各位佬关注! ❤️

## 课程目标
- 理解并发编程的基本概念
- 掌握多线程爬虫的实现方法
- 学会使用异步编程提高爬虫效率
- 了解并发控制和资源管理

## 1. 并发编程基础

### 1.1 为什么需要并发爬虫?
- 网络I/O是爬虫的主要瓶颈
- 单线程爬虫大部分时间在等待网络响应
- 并发可以显著提高爬虫效率
- 充分利用系统资源

### 1.2 并发方式对比
```python
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp

# 测试URL列表
urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/1'
]

def fetch_sync(url):
    """同步请求"""
    response = requests.get(url)
    return response.status_code

def test_sync():
    """测试同步爬虫"""
    start_time = time.time()
    
    results = []
    for url in urls:
        result = fetch_sync(url)
        results.append(result)
    
    end_time = time.time()
    print(f"同步爬虫耗时:{end_time - start_time:.2f}秒")
    return results

def test_threading():
    """测试多线程爬虫"""
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch_sync, urls))
    
    end_time = time.time()
    print(f"多线程爬虫耗时:{end_time - start_time:.2f}秒")
    return results

async def fetch_async(session, url):
    """异步请求"""
    async with session.get(url) as response:
        return response.status

async def test_async():
    """测试异步爬虫"""
    start_time = time.time()
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"异步爬虫耗时:{end_time - start_time:.2f}秒")
    return results

# 运行测试
if __name__ == "__main__":
    test_sync()      # 约5秒
    test_threading() # 约1秒
    asyncio.run(test_async())  # 约1秒

2. 多线程爬虫

2.1 基础多线程爬虫

import threading
import requests
from queue import Queue
import time

class ThreadSpider:
    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.url_queue = Queue()
        self.result_queue = Queue()
        self.session = requests.Session()
        
        # 设置请求头
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def worker(self):
        """工作线程函数"""
        while True:
            url = self.url_queue.get()
            if url is None:
                break
            
            try:
                response = self.session.get(url, timeout=10)
                result = {
                    'url': url,
                    'status_code': response.status_code,
                    'content': response.text,
                    'headers': dict(response.headers)
                }
                self.result_queue.put(result)
                print(f"✓ 成功爬取:{url}")
                
            except Exception as e:
                error_result = {
                    'url': url,
                    'error': str(e)
                }
                self.result_queue.put(error_result)
                print(f"✗ 爬取失败:{url} - {e}")
            
            finally:
                self.url_queue.task_done()
    
    def start_workers(self):
        """启动工作线程"""
        self.threads = []
        for i in range(self.max_workers):
            thread = threading.Thread(target=self.worker)
            thread.daemon = True
            thread.start()
            self.threads.append(thread)
    
    def add_urls(self, urls):
        """添加URL到队列"""
        for url in urls:
            self.url_queue.put(url)
    
    def crawl(self, urls):
        """开始爬取"""
        print(f"开始爬取 {len(urls)} 个URL,使用 {self.max_workers} 个线程")
        
        # 启动工作线程
        self.start_workers()
        
        # 添加URL
        self.add_urls(urls)
        
        # 等待所有任务完成
        self.url_queue.join()
        
        # 停止工作线程
        for _ in range(self.max_workers):
            self.url_queue.put(None)
        
        for thread in self.threads:
            thread.join()
        
        # 收集结果
        results = []
        while not self.result_queue.empty():
            results.append(self.result_queue.get())
        
        return results

# 使用示例
urls = [
    'https://httpbin.org/get',
    'https://httpbin.org/user-agent',
    'https://httpbin.org/headers',
    'https://httpbin.org/ip',
    'https://httpbin.org/uuid'
]

spider = ThreadSpider(max_workers=3)
results = spider.crawl(urls)

for result in results:
    if 'error' in result:
        print(f"错误:{result['url']} - {result['error']}")
    else:
        print(f"成功:{result['url']} - 状态码:{result['status_code']}")

2.2 使用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

class AdvancedThreadSpider:
    def __init__(self, max_workers=5, timeout=10):
        self.max_workers = max_workers
        self.timeout = timeout
        self.session = requests.Session()
        
        # 配置session
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        # 设置连接池
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=max_workers,
            pool_maxsize=max_workers * 2,
            max_retries=3
        )
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def fetch_url(self, url):
        """获取单个URL"""
        try:
            start_time = time.time()
            response = self.session.get(url, timeout=self.timeout)
            end_time = time.time()
            
            return {
                'url': url,
                'status_code': response.status_code,
                'content_length': len(response.content),
                'response_time': end_time - start_time,
                'content': response.text,
                'success': True
            }
            
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'success': False
            }
    
    def crawl_urls(self, urls, callback=None):
        """爬取URL列表"""
        results = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任务
            future_to_url = {
                executor.submit(self.fetch_url, url): url 
                for url in urls
            }
            
            # 处理完成的任务
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    # 调用回调函数
                    if callback:
                        callback(result)
                        
                except Exception as e:
                    error_result = {
                        'url': url,
                        'error': str(e),
                        'success': False
                    }
                    results.append(error_result)
                    
                    if callback:
                        callback(error_result)
        
        return results
    
    def crawl_with_progress(self, urls):
        """带进度显示的爬取"""
        total = len(urls)
        completed = 0
        
        def progress_callback(result):
            nonlocal completed
            completed += 1
            if result['success']:
                print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")
            else:
                print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")
        
        return self.crawl_urls(urls, callback=progress_callback)

# 使用示例
urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/status/200',
    'https://httpbin.org/status/404',
    'https://httpbin.org/json'
] * 3  # 重复3次,共15个URL

spider = AdvancedThreadSpider(max_workers=5)
start_time = time.time()
results = spider.crawl_with_progress(urls)
end_time = time.time()

print(f"\n总耗时:{end_time - start_time:.2f}秒")
print(f"成功:{sum(1 for r in results if r['success'])} 个")
print(f"失败:{sum(1 for r in results if not r['success'])} 个")

3. 异步爬虫

3.1 基础异步爬虫

import asyncio
import aiohttp
import time
from aiohttp import ClientTimeout, ClientSession

class AsyncSpider:
    def __init__(self, max_concurrent=10, timeout=10):
        self.max_concurrent = max_concurrent
        self.timeout = ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
        # 连接器配置
        self.connector = aiohttp.TCPConnector(
            limit=100,  # 总连接池大小
            limit_per_host=max_concurrent,  # 每个主机的连接数
            ttl_dns_cache=300,  # DNS缓存时间
            use_dns_cache=True,
        )
    
    async def fetch_url(self, session, url):
        """异步获取单个URL"""
        async with self.semaphore:  # 限制并发数
            try:
                start_time = time.time()
                async with session.get(url) as response:
                    content = await response.text()
                    end_time = time.time()
                    
                    return {
                        'url': url,
                        'status_code': response.status,
                        'content_length': len(content),
                        'response_time': end_time - start_time,
                        'content': content,
                        'success': True
                    }
                    
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'success': False
                }
    
    async def crawl_urls(self, urls):
        """异步爬取URL列表"""
        async with ClientSession(
            timeout=self.timeout,
            connector=self.connector,
            headers={'User-Agent': 'AsyncSpider/1.0'}
        ) as session:
            
            # 创建所有任务
            tasks = [self.fetch_url(session, url) for url in urls]
            
            # 等待所有任务完成
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理异常结果
            processed_results = []
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    processed_results.append({
                        'url': urls[i],
                        'error': str(result),
                        'success': False
                    })
                else:
                    processed_results.append(result)
            
            return processed_results
    
    async def crawl_with_progress(self, urls):
        """带进度显示的异步爬取"""
        total = len(urls)
        completed = 0
        results = []
        
        async def fetch_with_progress(session, url):
            nonlocal completed
            result = await self.fetch_url(session, url)
            completed += 1
            
            if result['success']:
                print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")
            else:
                print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")
            
            return result
        
        async with ClientSession(
            timeout=self.timeout,
            connector=self.connector,
            headers={'User-Agent': 'AsyncSpider/1.0'}
        ) as session:
            
            tasks = [fetch_with_progress(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        
        return results

# 使用示例
async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/json',
        'https://httpbin.org/user-agent',
        'https://httpbin.org/headers'
    ] * 4  # 重复4次,共20个URL
    
    spider = AsyncSpider(max_concurrent=5)
    
    start_time = time.time()
    results = await spider.crawl_with_progress(urls)
    end_time = time.time()
    
    print(f"\n总耗时:{end_time - start_time:.2f}秒")
    print(f"成功:{sum(1 for r in results if r['success'])} 个")
    print(f"失败:{sum(1 for r in results if not r['success'])} 个")

# 运行异步爬虫
asyncio.run(main())

3.2 高级异步爬虫

import asyncio
import aiohttp
import aiofiles
import json
from urllib.parse import urljoin, urlparse
import time
from dataclasses import dataclass
from typing import List, Optional, Callable

@dataclass
class CrawlResult:
    url: str
    status_code: Optional[int] = None
    content: Optional[str] = None
    error: Optional[str] = None
    response_time: Optional[float] = None
    success: bool = False

class AdvancedAsyncSpider:
    def __init__(self, 
                 max_concurrent=20,
                 timeout=30,
                 retry_times=3,
                 retry_delay=1):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.retry_times = retry_times
        self.retry_delay = retry_delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
        # 统计信息
        self.stats = {
            'total': 0,
            'success': 0,
            'failed': 0,
            'start_time': None,
            'end_time': None
        }
    
    async def fetch_with_retry(self, session, url):
        """带重试的异步请求"""
        for attempt in range(self.retry_times + 1):
            try:
                start_time = time.time()
                async with session.get(url) as response:
                    content = await response.text()
                    end_time = time.time()
                    
                    return CrawlResult(
                        url=url,
                        status_code=response.status,
                        content=content,
                        response_time=end_time - start_time,
                        success=True
                    )
                    
            except Exception as e:
                if attempt < self.retry_times:
                    await asyncio.sleep(self.retry_delay * (attempt + 1))
                    continue
                else:
                    return CrawlResult(
                        url=url,
                        error=str(e),
                        success=False
                    )
    
    async def process_url(self, session, url, processor=None):
        """处理单个URL"""
        async with self.semaphore:
            result = await self.fetch_with_retry(session, url)
            
            # 更新统计
            if result.success:
                self.stats['success'] += 1
            else:
                self.stats['failed'] += 1
            
            # 自定义处理器
            if processor and result.success:
                try:
                    processed_data = await processor(result)
                    result.processed_data = processed_data
                except Exception as e:
                    result.error = f"处理器错误:{e}"
                    result.success = False
            
            return result
    
    async def crawl_batch(self, urls, processor=None, progress_callback=None):
        """批量爬取"""
        self.stats['total'] = len(urls)
        self.stats['start_time'] = time.time()
        
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=self.max_concurrent,
            ttl_dns_cache=300,
            use_dns_cache=True
        )
        
        async with aiohttp.ClientSession(
            timeout=self.timeout,
            connector=connector,
            headers={
                'User-Agent': 'AdvancedAsyncSpider/1.0',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
            }
        ) as session:
            
            # 创建任务
            tasks = []
            for url in urls:
                task = self.process_url(session, url, processor)
                tasks.append(task)
            
            # 执行任务并收集结果
            results = []
            for coro in asyncio.as_completed(tasks):
                result = await coro
                results.append(result)
                
                # 进度回调
                if progress_callback:
                    await progress_callback(result, len(results), len(urls))
        
        self.stats['end_time'] = time.time()
        return results
    
    async def save_results(self, results, filename='results.json'):
        """保存结果到文件"""
        data = []
        for result in results:
            item = {
                'url': result.url,
                'status_code': result.status_code,
                'success': result.success,
                'response_time': result.response_time,
                'error': result.error
            }
            
            if hasattr(result, 'processed_data'):
                item['processed_data'] = result.processed_data
            
            data.append(item)
        
        async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(data, ensure_ascii=False, indent=2))
        
        print(f"结果已保存到 {filename}")
    
    def print_stats(self):
        """打印统计信息"""
        if self.stats['start_time'] and self.stats['end_time']:
            duration = self.stats['end_time'] - self.stats['start_time']
            rate = self.stats['total'] / duration if duration > 0 else 0
            
            print(f"\n=== 爬取统计 ===")
            print(f"总数:{self.stats['total']}")
            print(f"成功:{self.stats['success']}")
            print(f"失败:{self.stats['failed']}")
            print(f"耗时:{duration:.2f}秒")
            print(f"速率:{rate:.2f} URL/秒")

# 自定义处理器示例
async def json_processor(result: CrawlResult):
    """JSON数据处理器"""
    try:
        data = json.loads(result.content)
        return {
            'type': 'json',
            'keys': list(data.keys()) if isinstance(data, dict) else None,
            'length': len(data) if isinstance(data, (list, dict)) else None
        }
    except:
        return {'type': 'text', 'length': len(result.content)}

async def progress_callback(result, completed, total):
    """进度回调函数"""
    percentage = (completed / total) * 100
    status = "✓" if result.success else "✗"
    print(f"[{completed}/{total}] {status} {result.url} ({percentage:.1f}%)")

# 使用示例
async def main():
    urls = [
        'https://httpbin.org/json',
        'https://httpbin.org/user-agent',
        'https://httpbin.org/headers',
        'https://httpbin.org/ip',
        'https://httpbin.org/uuid'
    ] * 10  # 50个URL
    
    spider = AdvancedAsyncSpider(max_concurrent=10)
    
    results = await spider.crawl_batch(
        urls, 
        processor=json_processor,
        progress_callback=progress_callback
    )
    
    spider.print_stats()
    await spider.save_results(results)

# 运行
asyncio.run(main())

4. 并发控制和资源管理

4.1 速率限制

import asyncio
import time
from collections import deque

class RateLimiter:
    def __init__(self, max_calls, time_window):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """获取访问权限"""
        async with self.lock:
            now = time.time()
            
            # 清理过期的调用记录
            while self.calls and self.calls[0] <= now - self.time_window:
                self.calls.popleft()
            
            # 检查是否超过限制
            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
                    return await self.acquire()
            
            # 记录本次调用
            self.calls.append(now)

class RateLimitedSpider:
    def __init__(self, max_concurrent=10, rate_limit=None):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = rate_limit
    
    async def fetch_url(self, session, url):
        """带速率限制的请求"""
        async with self.semaphore:
            # 应用速率限制
            if self.rate_limiter:
                await self.rate_limiter.acquire()
            
            try:
                async with session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"请求失败:{url} - {e}")
                return None

# 使用示例:每秒最多5个请求
rate_limiter = RateLimiter(max_calls=5, time_window=1.0)
spider = RateLimitedSpider(max_concurrent=10, rate_limit=rate_limiter)

4.2 连接池管理

import aiohttp
import asyncio

class ConnectionPoolManager:
    def __init__(self, 
                 total_connections=100,
                 connections_per_host=10,
                 keepalive_timeout=30):
        
        self.connector = aiohttp.TCPConnector(
            limit=total_connections,
            limit_per_host=connections_per_host,
            keepalive_timeout=keepalive_timeout,
            enable_cleanup_closed=True,
            ttl_dns_cache=300,
            use_dns_cache=True
        )
        
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=aiohttp.ClientTimeout(total=30),
            headers={
                'User-Agent': 'ConnectionPoolSpider/1.0'
            }
        )
        return self.session
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
        await self.connector.close()

# 使用示例
async def main():
    urls = ['https://httpbin.org/get'] * 100
    
    async with ConnectionPoolManager() as session:
        tasks = []
        for url in urls:
            task = session.get(url)
            tasks.append(task)
        
        responses = await asyncio.gather(*tasks)
        
        for response in responses:
            print(f"状态码:{response.status}")
            response.close()

asyncio.run(main())

4.3 内存管理

import asyncio
import aiohttp
import gc
import psutil
import os

class MemoryManagedSpider:
    def __init__(self, max_memory_mb=500):
        self.max_memory_mb = max_memory_mb
        self.process = psutil.Process(os.getpid())
    
    def get_memory_usage(self):
        """获取当前内存使用量(MB)"""
        return self.process.memory_info().rss / 1024 / 1024
    
    async def check_memory(self):
        """检查内存使用量"""
        memory_usage = self.get_memory_usage()
        if memory_usage > self.max_memory_mb:
            print(f"内存使用量过高:{memory_usage:.1f}MB,执行垃圾回收")
            gc.collect()
            await asyncio.sleep(0.1)  # 让垃圾回收完成
            
            new_usage = self.get_memory_usage()
            print(f"垃圾回收后内存使用量:{new_usage:.1f}MB")
    
    async def fetch_with_memory_check(self, session, url):
        """带内存检查的请求"""
        await self.check_memory()
        
        try:
            async with session.get(url) as response:
                content = await response.text()
                # 处理完立即释放大对象
                result = len(content)
                del content
                return result
        except Exception as e:
            return None

# 使用示例
async def main():
    spider = MemoryManagedSpider(max_memory_mb=200)
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(1000):
            url = f'https://httpbin.org/bytes/{1024 * 10}'  # 10KB数据
            task = spider.fetch_with_memory_check(session, url)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        print(f"处理了 {len([r for r in results if r])} 个请求")

asyncio.run(main())

5. 实战案例:新闻网站并发爬虫

import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import json
import time
from dataclasses import dataclass, asdict
from typing import List, Optional

@dataclass
class NewsArticle:
    title: str
    url: str
    content: str
    publish_time: Optional[str] = None
    author: Optional[str] = None
    category: Optional[str] = None

class NewsSpider:
    def __init__(self, max_concurrent=20, delay=1):
        self.max_concurrent = max_concurrent
        self.delay = delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.articles = []
        
        # 统计信息
        self.stats = {
            'pages_crawled': 0,
            'articles_found': 0,
            'errors': 0
        }
    
    async def fetch_page(self, session, url):
        """获取页面内容"""
        async with self.semaphore:
            try:
                await asyncio.sleep(self.delay)  # 延迟请求
                
                async with session.get(url) as response:
                    if response.status == 200:
                        content = await response.text()
                        self.stats['pages_crawled'] += 1
                        return content
                    else:
                        print(f"HTTP错误 {response.status}: {url}")
                        self.stats['errors'] += 1
                        return None
                        
            except Exception as e:
                print(f"请求失败:{url} - {e}")
                self.stats['errors'] += 1
                return None
    
    def parse_article_list(self, html, base_url):
        """解析文章列表页面"""
        soup = BeautifulSoup(html, 'html.parser')
        article_links = []
        
        # 根据实际网站结构调整选择器
        for link in soup.select('a[href*="/article/"]'):
            href = link.get('href')
            if href:
                full_url = urljoin(base_url, href)
                article_links.append(full_url)
        
        return article_links
    
    def parse_article_detail(self, html, url):
        """解析文章详情页面"""
        soup = BeautifulSoup(html, 'html.parser')
        
        try:
            # 根据实际网站结构调整选择器
            title = soup.select_one('h1.article-title')
            title = title.get_text(strip=True) if title else "无标题"
            
            content_elem = soup.select_one('.article-content')
            content = content_elem.get_text(strip=True) if content_elem else ""
            
            author_elem = soup.select_one('.article-author')
            author = author_elem.get_text(strip=True) if author_elem else None
            
            time_elem = soup.select_one('.publish-time')
            publish_time = time_elem.get_text(strip=True) if time_elem else None
            
            category_elem = soup.select_one('.article-category')
            category = category_elem.get_text(strip=True) if category_elem else None
            
            return NewsArticle(
                title=title,
                url=url,
                content=content,
                author=author,
                publish_time=publish_time,
                category=category
            )
            
        except Exception as e:
            print(f"解析文章失败:{url} - {e}")
            return None
    
    async def crawl_article_list(self, session, list_url):
        """爬取文章列表页面"""
        html = await self.fetch_page(session, list_url)
        if html:
            return self.parse_article_list(html, list_url)
        return []
    
    async def crawl_article_detail(self, session, article_url):
        """爬取文章详情页面"""
        html = await self.fetch_page(session, article_url)
        if html:
            article = self.parse_article_detail(html, article_url)
            if article:
                self.articles.append(article)
                self.stats['articles_found'] += 1
                print(f"✓ 爬取文章:{article.title}")
                return article
        return None
    
    async def crawl_news_site(self, base_url, max_pages=5):
        """爬取新闻网站"""
        print(f"开始爬取新闻网站:{base_url}")
        
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=self.max_concurrent
        )
        
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=30),
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        ) as session:
            
            # 第一步:获取文章列表
            article_urls = set()
            
            list_tasks = []
            for page in range(1, max_pages + 1):
                list_url = f"{base_url}/news/page/{page}"
                task = self.crawl_article_list(session, list_url)
                list_tasks.append(task)
            
            # 收集所有文章链接
            list_results = await asyncio.gather(*list_tasks)
            for urls in list_results:
                article_urls.update(urls)
            
            print(f"找到 {len(article_urls)} 篇文章")
            
            # 第二步:爬取文章详情
            detail_tasks = []
            for url in article_urls:
                task = self.crawl_article_detail(session, url)
                detail_tasks.append(task)
            
            # 批量处理,避免内存过大
            batch_size = 50
            for i in range(0, len(detail_tasks), batch_size):
                batch = detail_tasks[i:i + batch_size]
                await asyncio.gather(*batch)
                print(f"已处理 {min(i + batch_size, len(detail_tasks))}/{len(detail_tasks)} 篇文章")
    
    async def save_articles(self, filename='news_articles.json'):
        """保存文章到文件"""
        data = [asdict(article) for article in self.articles]
        
        async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(data, ensure_ascii=False, indent=2))
        
        print(f"已保存 {len(self.articles)} 篇文章到 {filename}")
    
    def print_stats(self):
        """打印统计信息"""
        print(f"\n=== 爬取统计 ===")
        print(f"页面爬取:{self.stats['pages_crawled']}")
        print(f"文章获取:{self.stats['articles_found']}")
        print(f"错误次数:{self.stats['errors']}")

# 使用示例
async def main():
    spider = NewsSpider(max_concurrent=10, delay=0.5)
    
    start_time = time.time()
    
    # 爬取新闻网站(示例URL,实际使用时需要替换)
    await spider.crawl_news_site('https://example-news.com', max_pages=3)
    
    end_time = time.time()
    
    # 保存结果
    await spider.save_articles()
    
    # 打印统计
    spider.print_stats()
    print(f"总耗时:{end_time - start_time:.2f}秒")

# 运行爬虫
asyncio.run(main())

6. 性能优化技巧

6.1 选择合适的并发数

import asyncio
import aiohttp
import time

async def benchmark_concurrency():
    """测试不同并发数的性能"""
    url = 'https://httpbin.org/delay/0.5'
    total_requests = 100
    
    concurrency_levels = [1, 5, 10, 20, 50, 100]
    
    for concurrency in concurrency_levels:
        semaphore = asyncio.Semaphore(concurrency)
        
        async def fetch(session):
            async with semaphore:
                async with session.get(url) as response:
                    return response.status
        
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            tasks = [fetch(session) for _ in range(total_requests)]
            results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        duration = end_time - start_time
        rate = total_requests / duration
        
        print(f"并发数 {concurrency:3d}: {duration:6.2f}秒, {rate:6.1f} req/s")

asyncio.run(benchmark_concurrency())

6.2 连接复用优化

import aiohttp
import asyncio

class OptimizedSpider:
    def __init__(self):
        # 优化连接器配置
        self.connector = aiohttp.TCPConnector(
            limit=100,              # 总连接数
            limit_per_host=20,      # 每个主机连接数
            keepalive_timeout=60,   # 保持连接时间
            enable_cleanup_closed=True,  # 清理关闭的连接
            ttl_dns_cache=300,      # DNS缓存时间
            use_dns_cache=True,     # 启用DNS缓存
            resolver=aiohttp.AsyncResolver(),  # 异步DNS解析
        )
        
        # 优化超时设置
        self.timeout = aiohttp.ClientTimeout(
            total=30,      # 总超时
            connect=10,    # 连接超时
            sock_read=10   # 读取超时
        )
    
    async def create_session(self):
        return aiohttp.ClientSession(
            connector=self.connector,
            timeout=self.timeout,
            headers={
                'User-Agent': 'OptimizedSpider/1.0',
                'Accept-Encoding': 'gzip, deflate',  # 启用压缩
                'Connection': 'keep-alive'           # 保持连接
            }
        )
    
    async def close(self):
        await self.connector.close()

7. 实践练习

练习1:多线程图片下载器

编写一个多线程图片下载器,支持批量下载图片。

练习2:异步API数据采集

使用异步方式采集多个API的数据并合并结果。

练习3:并发爬虫监控

为并发爬虫添加实时监控功能,显示爬取进度和性能指标。

8. 课程小结

本课程我们学习了:

  1. 并发编程的基本概念和优势
  2. 多线程爬虫的实现方法
  3. 异步爬虫的高级技术
  4. 并发控制和资源管理
  5. 性能优化的实用技巧

9. 下节预告

下一课我们将学习:

  • 代理池的构建和使用
  • IP轮换和反反爬虫技术
  • 用户代理伪装
  • 请求头优化策略

10. 作业

  1. 实现一个支持断点续传的多线程下载器
  2. 编写异步爬虫爬取电商网站商品信息
  3. 对比不同并发方式的性能差异
  4. 实现一个带监控的分布式爬虫系统

提示:合理的并发数通常在10-50之间,过高的并发可能导致被网站封禁或系统资源耗尽。



  • 希望对初学者有帮助;致力于办公自动化的小小程序员一枚
  • 希望能得到大家的【❤️一个免费关注❤️】感谢!
  • 求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
  • 此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
  • 此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
  • 此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏


image-giWK.png

Python爬虫第7课:多线程与异步爬虫技术
作者
一晌小贪欢
发表于
2025-10-23
License
CC BY-NC-SA 4.0

评论