🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双
💻 个人主页——>个人主页欢迎访问
😸 Github主页——>Github主页欢迎访问
❓ 知乎主页——>知乎主页欢迎访问
🏳️🌈 CSDN博客主页:请点击——> 一晌小贪欢的博客主页求关注
👍 该系列文章专栏:请点击——>Python办公自动化专栏求订阅
🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏求订阅
📕 此外还有python基础专栏:请点击——>Python基础学习专栏求订阅
文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
❤️ 欢迎各位佬关注! ❤️
## 课程目标
- 理解并发编程的基本概念
- 掌握多线程爬虫的实现方法
- 学会使用异步编程提高爬虫效率
- 了解并发控制和资源管理
## 1. 并发编程基础
### 1.1 为什么需要并发爬虫?
- 网络I/O是爬虫的主要瓶颈
- 单线程爬虫大部分时间在等待网络响应
- 并发可以显著提高爬虫效率
- 充分利用系统资源
### 1.2 并发方式对比
```python
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp
# 测试URL列表
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
def fetch_sync(url):
"""同步请求"""
response = requests.get(url)
return response.status_code
def test_sync():
"""测试同步爬虫"""
start_time = time.time()
results = []
for url in urls:
result = fetch_sync(url)
results.append(result)
end_time = time.time()
print(f"同步爬虫耗时:{end_time - start_time:.2f}秒")
return results
def test_threading():
"""测试多线程爬虫"""
start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_sync, urls))
end_time = time.time()
print(f"多线程爬虫耗时:{end_time - start_time:.2f}秒")
return results
async def fetch_async(session, url):
"""异步请求"""
async with session.get(url) as response:
return response.status
async def test_async():
"""测试异步爬虫"""
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"异步爬虫耗时:{end_time - start_time:.2f}秒")
return results
# 运行测试
if __name__ == "__main__":
test_sync() # 约5秒
test_threading() # 约1秒
asyncio.run(test_async()) # 约1秒
2. 多线程爬虫
2.1 基础多线程爬虫
import threading
import requests
from queue import Queue
import time
class ThreadSpider:
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.url_queue = Queue()
self.result_queue = Queue()
self.session = requests.Session()
# 设置请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def worker(self):
"""工作线程函数"""
while True:
url = self.url_queue.get()
if url is None:
break
try:
response = self.session.get(url, timeout=10)
result = {
'url': url,
'status_code': response.status_code,
'content': response.text,
'headers': dict(response.headers)
}
self.result_queue.put(result)
print(f"✓ 成功爬取:{url}")
except Exception as e:
error_result = {
'url': url,
'error': str(e)
}
self.result_queue.put(error_result)
print(f"✗ 爬取失败:{url} - {e}")
finally:
self.url_queue.task_done()
def start_workers(self):
"""启动工作线程"""
self.threads = []
for i in range(self.max_workers):
thread = threading.Thread(target=self.worker)
thread.daemon = True
thread.start()
self.threads.append(thread)
def add_urls(self, urls):
"""添加URL到队列"""
for url in urls:
self.url_queue.put(url)
def crawl(self, urls):
"""开始爬取"""
print(f"开始爬取 {len(urls)} 个URL,使用 {self.max_workers} 个线程")
# 启动工作线程
self.start_workers()
# 添加URL
self.add_urls(urls)
# 等待所有任务完成
self.url_queue.join()
# 停止工作线程
for _ in range(self.max_workers):
self.url_queue.put(None)
for thread in self.threads:
thread.join()
# 收集结果
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return results
# 使用示例
urls = [
'https://httpbin.org/get',
'https://httpbin.org/user-agent',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
'https://httpbin.org/uuid'
]
spider = ThreadSpider(max_workers=3)
results = spider.crawl(urls)
for result in results:
if 'error' in result:
print(f"错误:{result['url']} - {result['error']}")
else:
print(f"成功:{result['url']} - 状态码:{result['status_code']}")
2.2 使用ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
class AdvancedThreadSpider:
def __init__(self, max_workers=5, timeout=10):
self.max_workers = max_workers
self.timeout = timeout
self.session = requests.Session()
# 配置session
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 设置连接池
adapter = requests.adapters.HTTPAdapter(
pool_connections=max_workers,
pool_maxsize=max_workers * 2,
max_retries=3
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def fetch_url(self, url):
"""获取单个URL"""
try:
start_time = time.time()
response = self.session.get(url, timeout=self.timeout)
end_time = time.time()
return {
'url': url,
'status_code': response.status_code,
'content_length': len(response.content),
'response_time': end_time - start_time,
'content': response.text,
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
def crawl_urls(self, urls, callback=None):
"""爬取URL列表"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_url = {
executor.submit(self.fetch_url, url): url
for url in urls
}
# 处理完成的任务
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
# 调用回调函数
if callback:
callback(result)
except Exception as e:
error_result = {
'url': url,
'error': str(e),
'success': False
}
results.append(error_result)
if callback:
callback(error_result)
return results
def crawl_with_progress(self, urls):
"""带进度显示的爬取"""
total = len(urls)
completed = 0
def progress_callback(result):
nonlocal completed
completed += 1
if result['success']:
print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")
else:
print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")
return self.crawl_urls(urls, callback=progress_callback)
# 使用示例
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://httpbin.org/json'
] * 3 # 重复3次,共15个URL
spider = AdvancedThreadSpider(max_workers=5)
start_time = time.time()
results = spider.crawl_with_progress(urls)
end_time = time.time()
print(f"\n总耗时:{end_time - start_time:.2f}秒")
print(f"成功:{sum(1 for r in results if r['success'])} 个")
print(f"失败:{sum(1 for r in results if not r['success'])} 个")
3. 异步爬虫
3.1 基础异步爬虫
import asyncio
import aiohttp
import time
from aiohttp import ClientTimeout, ClientSession
class AsyncSpider:
def __init__(self, max_concurrent=10, timeout=10):
self.max_concurrent = max_concurrent
self.timeout = ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
# 连接器配置
self.connector = aiohttp.TCPConnector(
limit=100, # 总连接池大小
limit_per_host=max_concurrent, # 每个主机的连接数
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True,
)
async def fetch_url(self, session, url):
"""异步获取单个URL"""
async with self.semaphore: # 限制并发数
try:
start_time = time.time()
async with session.get(url) as response:
content = await response.text()
end_time = time.time()
return {
'url': url,
'status_code': response.status,
'content_length': len(content),
'response_time': end_time - start_time,
'content': content,
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
async def crawl_urls(self, urls):
"""异步爬取URL列表"""
async with ClientSession(
timeout=self.timeout,
connector=self.connector,
headers={'User-Agent': 'AsyncSpider/1.0'}
) as session:
# 创建所有任务
tasks = [self.fetch_url(session, url) for url in urls]
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'url': urls[i],
'error': str(result),
'success': False
})
else:
processed_results.append(result)
return processed_results
async def crawl_with_progress(self, urls):
"""带进度显示的异步爬取"""
total = len(urls)
completed = 0
results = []
async def fetch_with_progress(session, url):
nonlocal completed
result = await self.fetch_url(session, url)
completed += 1
if result['success']:
print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")
else:
print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")
return result
async with ClientSession(
timeout=self.timeout,
connector=self.connector,
headers={'User-Agent': 'AsyncSpider/1.0'}
) as session:
tasks = [fetch_with_progress(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 使用示例
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/json',
'https://httpbin.org/user-agent',
'https://httpbin.org/headers'
] * 4 # 重复4次,共20个URL
spider = AsyncSpider(max_concurrent=5)
start_time = time.time()
results = await spider.crawl_with_progress(urls)
end_time = time.time()
print(f"\n总耗时:{end_time - start_time:.2f}秒")
print(f"成功:{sum(1 for r in results if r['success'])} 个")
print(f"失败:{sum(1 for r in results if not r['success'])} 个")
# 运行异步爬虫
asyncio.run(main())
3.2 高级异步爬虫
import asyncio
import aiohttp
import aiofiles
import json
from urllib.parse import urljoin, urlparse
import time
from dataclasses import dataclass
from typing import List, Optional, Callable
@dataclass
class CrawlResult:
url: str
status_code: Optional[int] = None
content: Optional[str] = None
error: Optional[str] = None
response_time: Optional[float] = None
success: bool = False
class AdvancedAsyncSpider:
def __init__(self,
max_concurrent=20,
timeout=30,
retry_times=3,
retry_delay=1):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.retry_times = retry_times
self.retry_delay = retry_delay
self.semaphore = asyncio.Semaphore(max_concurrent)
# 统计信息
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'start_time': None,
'end_time': None
}
async def fetch_with_retry(self, session, url):
"""带重试的异步请求"""
for attempt in range(self.retry_times + 1):
try:
start_time = time.time()
async with session.get(url) as response:
content = await response.text()
end_time = time.time()
return CrawlResult(
url=url,
status_code=response.status,
content=content,
response_time=end_time - start_time,
success=True
)
except Exception as e:
if attempt < self.retry_times:
await asyncio.sleep(self.retry_delay * (attempt + 1))
continue
else:
return CrawlResult(
url=url,
error=str(e),
success=False
)
async def process_url(self, session, url, processor=None):
"""处理单个URL"""
async with self.semaphore:
result = await self.fetch_with_retry(session, url)
# 更新统计
if result.success:
self.stats['success'] += 1
else:
self.stats['failed'] += 1
# 自定义处理器
if processor and result.success:
try:
processed_data = await processor(result)
result.processed_data = processed_data
except Exception as e:
result.error = f"处理器错误:{e}"
result.success = False
return result
async def crawl_batch(self, urls, processor=None, progress_callback=None):
"""批量爬取"""
self.stats['total'] = len(urls)
self.stats['start_time'] = time.time()
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=self.max_concurrent,
ttl_dns_cache=300,
use_dns_cache=True
)
async with aiohttp.ClientSession(
timeout=self.timeout,
connector=connector,
headers={
'User-Agent': 'AdvancedAsyncSpider/1.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
}
) as session:
# 创建任务
tasks = []
for url in urls:
task = self.process_url(session, url, processor)
tasks.append(task)
# 执行任务并收集结果
results = []
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
# 进度回调
if progress_callback:
await progress_callback(result, len(results), len(urls))
self.stats['end_time'] = time.time()
return results
async def save_results(self, results, filename='results.json'):
"""保存结果到文件"""
data = []
for result in results:
item = {
'url': result.url,
'status_code': result.status_code,
'success': result.success,
'response_time': result.response_time,
'error': result.error
}
if hasattr(result, 'processed_data'):
item['processed_data'] = result.processed_data
data.append(item)
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(data, ensure_ascii=False, indent=2))
print(f"结果已保存到 {filename}")
def print_stats(self):
"""打印统计信息"""
if self.stats['start_time'] and self.stats['end_time']:
duration = self.stats['end_time'] - self.stats['start_time']
rate = self.stats['total'] / duration if duration > 0 else 0
print(f"\n=== 爬取统计 ===")
print(f"总数:{self.stats['total']}")
print(f"成功:{self.stats['success']}")
print(f"失败:{self.stats['failed']}")
print(f"耗时:{duration:.2f}秒")
print(f"速率:{rate:.2f} URL/秒")
# 自定义处理器示例
async def json_processor(result: CrawlResult):
"""JSON数据处理器"""
try:
data = json.loads(result.content)
return {
'type': 'json',
'keys': list(data.keys()) if isinstance(data, dict) else None,
'length': len(data) if isinstance(data, (list, dict)) else None
}
except:
return {'type': 'text', 'length': len(result.content)}
async def progress_callback(result, completed, total):
"""进度回调函数"""
percentage = (completed / total) * 100
status = "✓" if result.success else "✗"
print(f"[{completed}/{total}] {status} {result.url} ({percentage:.1f}%)")
# 使用示例
async def main():
urls = [
'https://httpbin.org/json',
'https://httpbin.org/user-agent',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
'https://httpbin.org/uuid'
] * 10 # 50个URL
spider = AdvancedAsyncSpider(max_concurrent=10)
results = await spider.crawl_batch(
urls,
processor=json_processor,
progress_callback=progress_callback
)
spider.print_stats()
await spider.save_results(results)
# 运行
asyncio.run(main())
4. 并发控制和资源管理
4.1 速率限制
import asyncio
import time
from collections import deque
class RateLimiter:
def __init__(self, max_calls, time_window):
self.max_calls = max_calls
self.time_window = time_window
self.calls = deque()
self.lock = asyncio.Lock()
async def acquire(self):
"""获取访问权限"""
async with self.lock:
now = time.time()
# 清理过期的调用记录
while self.calls and self.calls[0] <= now - self.time_window:
self.calls.popleft()
# 检查是否超过限制
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
return await self.acquire()
# 记录本次调用
self.calls.append(now)
class RateLimitedSpider:
def __init__(self, max_concurrent=10, rate_limit=None):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = rate_limit
async def fetch_url(self, session, url):
"""带速率限制的请求"""
async with self.semaphore:
# 应用速率限制
if self.rate_limiter:
await self.rate_limiter.acquire()
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
print(f"请求失败:{url} - {e}")
return None
# 使用示例:每秒最多5个请求
rate_limiter = RateLimiter(max_calls=5, time_window=1.0)
spider = RateLimitedSpider(max_concurrent=10, rate_limit=rate_limiter)
4.2 连接池管理
import aiohttp
import asyncio
class ConnectionPoolManager:
def __init__(self,
total_connections=100,
connections_per_host=10,
keepalive_timeout=30):
self.connector = aiohttp.TCPConnector(
limit=total_connections,
limit_per_host=connections_per_host,
keepalive_timeout=keepalive_timeout,
enable_cleanup_closed=True,
ttl_dns_cache=300,
use_dns_cache=True
)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=30),
headers={
'User-Agent': 'ConnectionPoolSpider/1.0'
}
)
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
await self.connector.close()
# 使用示例
async def main():
urls = ['https://httpbin.org/get'] * 100
async with ConnectionPoolManager() as session:
tasks = []
for url in urls:
task = session.get(url)
tasks.append(task)
responses = await asyncio.gather(*tasks)
for response in responses:
print(f"状态码:{response.status}")
response.close()
asyncio.run(main())
4.3 内存管理
import asyncio
import aiohttp
import gc
import psutil
import os
class MemoryManagedSpider:
def __init__(self, max_memory_mb=500):
self.max_memory_mb = max_memory_mb
self.process = psutil.Process(os.getpid())
def get_memory_usage(self):
"""获取当前内存使用量(MB)"""
return self.process.memory_info().rss / 1024 / 1024
async def check_memory(self):
"""检查内存使用量"""
memory_usage = self.get_memory_usage()
if memory_usage > self.max_memory_mb:
print(f"内存使用量过高:{memory_usage:.1f}MB,执行垃圾回收")
gc.collect()
await asyncio.sleep(0.1) # 让垃圾回收完成
new_usage = self.get_memory_usage()
print(f"垃圾回收后内存使用量:{new_usage:.1f}MB")
async def fetch_with_memory_check(self, session, url):
"""带内存检查的请求"""
await self.check_memory()
try:
async with session.get(url) as response:
content = await response.text()
# 处理完立即释放大对象
result = len(content)
del content
return result
except Exception as e:
return None
# 使用示例
async def main():
spider = MemoryManagedSpider(max_memory_mb=200)
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(1000):
url = f'https://httpbin.org/bytes/{1024 * 10}' # 10KB数据
task = spider.fetch_with_memory_check(session, url)
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"处理了 {len([r for r in results if r])} 个请求")
asyncio.run(main())
5. 实战案例:新闻网站并发爬虫
import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import json
import time
from dataclasses import dataclass, asdict
from typing import List, Optional
@dataclass
class NewsArticle:
title: str
url: str
content: str
publish_time: Optional[str] = None
author: Optional[str] = None
category: Optional[str] = None
class NewsSpider:
def __init__(self, max_concurrent=20, delay=1):
self.max_concurrent = max_concurrent
self.delay = delay
self.semaphore = asyncio.Semaphore(max_concurrent)
self.articles = []
# 统计信息
self.stats = {
'pages_crawled': 0,
'articles_found': 0,
'errors': 0
}
async def fetch_page(self, session, url):
"""获取页面内容"""
async with self.semaphore:
try:
await asyncio.sleep(self.delay) # 延迟请求
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
self.stats['pages_crawled'] += 1
return content
else:
print(f"HTTP错误 {response.status}: {url}")
self.stats['errors'] += 1
return None
except Exception as e:
print(f"请求失败:{url} - {e}")
self.stats['errors'] += 1
return None
def parse_article_list(self, html, base_url):
"""解析文章列表页面"""
soup = BeautifulSoup(html, 'html.parser')
article_links = []
# 根据实际网站结构调整选择器
for link in soup.select('a[href*="/article/"]'):
href = link.get('href')
if href:
full_url = urljoin(base_url, href)
article_links.append(full_url)
return article_links
def parse_article_detail(self, html, url):
"""解析文章详情页面"""
soup = BeautifulSoup(html, 'html.parser')
try:
# 根据实际网站结构调整选择器
title = soup.select_one('h1.article-title')
title = title.get_text(strip=True) if title else "无标题"
content_elem = soup.select_one('.article-content')
content = content_elem.get_text(strip=True) if content_elem else ""
author_elem = soup.select_one('.article-author')
author = author_elem.get_text(strip=True) if author_elem else None
time_elem = soup.select_one('.publish-time')
publish_time = time_elem.get_text(strip=True) if time_elem else None
category_elem = soup.select_one('.article-category')
category = category_elem.get_text(strip=True) if category_elem else None
return NewsArticle(
title=title,
url=url,
content=content,
author=author,
publish_time=publish_time,
category=category
)
except Exception as e:
print(f"解析文章失败:{url} - {e}")
return None
async def crawl_article_list(self, session, list_url):
"""爬取文章列表页面"""
html = await self.fetch_page(session, list_url)
if html:
return self.parse_article_list(html, list_url)
return []
async def crawl_article_detail(self, session, article_url):
"""爬取文章详情页面"""
html = await self.fetch_page(session, article_url)
if html:
article = self.parse_article_detail(html, article_url)
if article:
self.articles.append(article)
self.stats['articles_found'] += 1
print(f"✓ 爬取文章:{article.title}")
return article
return None
async def crawl_news_site(self, base_url, max_pages=5):
"""爬取新闻网站"""
print(f"开始爬取新闻网站:{base_url}")
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=self.max_concurrent
)
async with aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30),
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
) as session:
# 第一步:获取文章列表
article_urls = set()
list_tasks = []
for page in range(1, max_pages + 1):
list_url = f"{base_url}/news/page/{page}"
task = self.crawl_article_list(session, list_url)
list_tasks.append(task)
# 收集所有文章链接
list_results = await asyncio.gather(*list_tasks)
for urls in list_results:
article_urls.update(urls)
print(f"找到 {len(article_urls)} 篇文章")
# 第二步:爬取文章详情
detail_tasks = []
for url in article_urls:
task = self.crawl_article_detail(session, url)
detail_tasks.append(task)
# 批量处理,避免内存过大
batch_size = 50
for i in range(0, len(detail_tasks), batch_size):
batch = detail_tasks[i:i + batch_size]
await asyncio.gather(*batch)
print(f"已处理 {min(i + batch_size, len(detail_tasks))}/{len(detail_tasks)} 篇文章")
async def save_articles(self, filename='news_articles.json'):
"""保存文章到文件"""
data = [asdict(article) for article in self.articles]
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(data, ensure_ascii=False, indent=2))
print(f"已保存 {len(self.articles)} 篇文章到 {filename}")
def print_stats(self):
"""打印统计信息"""
print(f"\n=== 爬取统计 ===")
print(f"页面爬取:{self.stats['pages_crawled']}")
print(f"文章获取:{self.stats['articles_found']}")
print(f"错误次数:{self.stats['errors']}")
# 使用示例
async def main():
spider = NewsSpider(max_concurrent=10, delay=0.5)
start_time = time.time()
# 爬取新闻网站(示例URL,实际使用时需要替换)
await spider.crawl_news_site('https://example-news.com', max_pages=3)
end_time = time.time()
# 保存结果
await spider.save_articles()
# 打印统计
spider.print_stats()
print(f"总耗时:{end_time - start_time:.2f}秒")
# 运行爬虫
asyncio.run(main())
6. 性能优化技巧
6.1 选择合适的并发数
import asyncio
import aiohttp
import time
async def benchmark_concurrency():
"""测试不同并发数的性能"""
url = 'https://httpbin.org/delay/0.5'
total_requests = 100
concurrency_levels = [1, 5, 10, 20, 50, 100]
for concurrency in concurrency_levels:
semaphore = asyncio.Semaphore(concurrency)
async def fetch(session):
async with semaphore:
async with session.get(url) as response:
return response.status
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch(session) for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
end_time = time.time()
duration = end_time - start_time
rate = total_requests / duration
print(f"并发数 {concurrency:3d}: {duration:6.2f}秒, {rate:6.1f} req/s")
asyncio.run(benchmark_concurrency())
6.2 连接复用优化
import aiohttp
import asyncio
class OptimizedSpider:
def __init__(self):
# 优化连接器配置
self.connector = aiohttp.TCPConnector(
limit=100, # 总连接数
limit_per_host=20, # 每个主机连接数
keepalive_timeout=60, # 保持连接时间
enable_cleanup_closed=True, # 清理关闭的连接
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True, # 启用DNS缓存
resolver=aiohttp.AsyncResolver(), # 异步DNS解析
)
# 优化超时设置
self.timeout = aiohttp.ClientTimeout(
total=30, # 总超时
connect=10, # 连接超时
sock_read=10 # 读取超时
)
async def create_session(self):
return aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout,
headers={
'User-Agent': 'OptimizedSpider/1.0',
'Accept-Encoding': 'gzip, deflate', # 启用压缩
'Connection': 'keep-alive' # 保持连接
}
)
async def close(self):
await self.connector.close()
7. 实践练习
练习1:多线程图片下载器
编写一个多线程图片下载器,支持批量下载图片。
练习2:异步API数据采集
使用异步方式采集多个API的数据并合并结果。
练习3:并发爬虫监控
为并发爬虫添加实时监控功能,显示爬取进度和性能指标。
8. 课程小结
本课程我们学习了:
- 并发编程的基本概念和优势
- 多线程爬虫的实现方法
- 异步爬虫的高级技术
- 并发控制和资源管理
- 性能优化的实用技巧
9. 下节预告
下一课我们将学习:
- 代理池的构建和使用
- IP轮换和反反爬虫技术
- 用户代理伪装
- 请求头优化策略
10. 作业
- 实现一个支持断点续传的多线程下载器
- 编写异步爬虫爬取电商网站商品信息
- 对比不同并发方式的性能差异
- 实现一个带监控的分布式爬虫系统
提示:合理的并发数通常在10-50之间,过高的并发可能导致被网站封禁或系统资源耗尽。
希望对初学者有帮助;致力于办公自动化的小小程序员一枚
希望能得到大家的【❤️一个免费关注❤️】感谢!
求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏
