跳到主要内容

Scrapy 中的 Pipline管道

本小节中我们将详细介绍 Scrapy 中的 Pipeline 及其多种用法和使用场景。Pipeline 是 Scrapy 框架的一个重要模块,从前面的 Scrapy 架构图中我们可以看到它位于架构图的最左边,用于连续处理从网页中抓取到的每条记录,就像一个流水线工厂加工食品那样,完成食品最后的封装、保存等操作。此外,我们还会介绍 Scrapy 内置的图片管道,可以自动下载对应地址的图片。最后,我们会基于上述内容完成一个小说网站的爬取案例。

1. Scrapy 中的 Pipeline 介绍

Pipeline 的中文意思是管道,类似于工厂的流水线那样。Scrapy 中的 Pipeline 通常是和 Items 联系在一起的,其实就是对 Items 数据的流水线处理。 一般而言,Pipeline 的典型应用场景如下:

  • 数据清洗、去重
  • 验证数据的有效性
  • 按照自定义格式保存数据
  • 存储到合适的数据库中 (如 MySQL、Redis 或者 MongoDB)

通过前面的 Scrapy 架构图可知,Pipeline 位于 Scrapy 数据处理流程的最后一步,但是它也不是必须,Pipeline 默认处于关闭状态。如果需要的话,我们只需要在 settings.py 中设置 ITEM_PIPELINES 属性值即可。它是一个数组值,我们可以定义多个 Item Pipeline,并且在 ITEM_PIPELINES 中设置相应 Pipeline 的优先级。这样 Scrapy 会依次处理这些 Pipelines,最后达到我们想要的效果。

图片描述

item 经过 pipelines 处理 注意:上面的 pipeline 顺序和功能都可以任意调整,保证逻辑性即可。比如有一个去重的 pipeline 和保存到数据库的 pipeline,那么去重的 pipeline 一定要在保存数据库之前,这样保存的就是不重复的数据。

2. 如何编写自己的 Item Pipeline

编写自己的 Item Pipeline 非常简单,我们只需要编写一个简单的类,实现四个特定名称的方法即可 (部分方法非必须)。我们来简单说明下这三个方法:

  • open_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法是 MiddlewareManager 类中的方法,在 Spider 开启时被调用,主要做一些初始化操作,如连接数据库、打开要保存的文件等;
  • close_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法也是 MiddlewareManager 类中的方法,在 Spider 关闭时被调用,主要做一些如关闭数据库连接、关闭打开的文件等操作;
  • from_crawler(cls, crawler):非必需,在 Spider启用时调用,且早于 open_spider() 方法。这个方法我们很少去重载,可以不用;
  • process_item(item, spider):必须实现。该函数有两个参数,一个是表示被处理的 Item 对象,另一个是生成该 Item 的 Spider 对象。定义的 Item pipeline 会默认调用该方法对 Item 进行处理,这也是 Pipeline 的工作核心;

完成这样一个 Item Pipeline 后,将该类的路径地址添加到 settings.py 中的 ITEM_PIPELINES 中即可。下图是我们一个简单项目完成的两个 pipelines。

图片描述

一个简单项目的 pipelines 示例

3. 实战演练

学习了上面的一些知识,我们来使用一个简单的网站进行实战演练,在该过程中介绍更多的和 Item Pipeline 相关的用法。

假设我们是一名小说爱好者,我想到起点中文网上去找一些好的小说看,我该怎么找呢?起点中文网的月票榜是一个不错的参考方式,如下图所示:

图片描述

起点中文网月票榜 其实简单看一看就知道月票榜的 url 组成:

  • 主体 url:https://www.qidian.com/rank/yuepiao
  • 参数 month:02 表示 2 月份,03 表示 3 月份,目前为止最多到 7 月份;
  • 参数 chn:表示的是分类,-1 表示全部分类。21 表示玄幻,22表示仙侠;
  • 参数 page:表示第几页,一页有20个作品。

目前我们只需要从 01 月份开始到 07 月份的月票榜中,每次都取得第一页的数据,也就是月票榜的前20 名。7 个月份的前 20 名加起来,然后再去重,就得到了曾经的占据月票榜的作品,这中间大概率都是比较好看的书。完成这个简单的需求我们按照如下的步骤进行:

创建初始项目 qidian_yuepiao:

[root@server ~]# pyenv activate scrapy-test
(scrapy-test) [root@server ~]# cd scrapy-test
(scrapy-test) [root@server scrapy-test]# scrapy startproject qidian_yuepia

(scrapy-test) [root@server qidian_yuepiao]# ls
__init__.py items.py middlewares.py pipelines.py settings.py spider

接下来我们准备获取小说作品的字段,大概会获取如下几个数据:

  • 小说名:name;
  • 小说作者:author;
  • 小说类型:fiction_type。比如玄幻、仙侠、科幻等;
  • 小说状态:state。连载还是完结;
  • 封面图片地址:image_url;
  • images:保存图片数据;
  • brief_introduction:作品简介;
  • book_url:小说的具体地址。

根据定义的这些字段,我们可以写出对应的 Items 类,如下:

(scrapy-test) [root@server qidian_yuepiao]# cat items.py 
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy


class QidianYuepiaoItem(scrapy.Item):
# define the fields for your item here like:
name = scrapy.Field()
author = scrapy.Field()
fiction_type = scrapy.Field()
state = scrapy.Field()
image_url = scrapy.Field()
images = scrapy.Field()
brief_introduction = scrapy.Field()
book_url = scrapy.Field()

到了最关键的地方,需要解析网页数据,提取月票榜的作品信息。这个和前面一些,我们只需要完成相应的 xpath 即可。此外,我们会从 01 月份的月票榜开始,每次会新生成一个 url,主要改动的就是月份参数,每次将月份数加一;如果当前月份大于07,则终止。

(scrapy-test) [root@server qidian_yuepiao]# touch spiders/qidian_yuepiao_parse.py

import re

from scrapy import Request
from scrapy.spiders import Spider
from qidian_yuepiao.items import QidianYuepiaoItem


def num\_to\_str(num, size=2, padding='0'):
"""
0 - > 00 1 -> 01 11 -> 11
:param num:
:param size:
:param padding:
:return:
"""
str_num = str(num)
while len(str_num) < size:
str_num = padding + str_num
return str_num


class QidianSpider(Spider):
name = "qidian\_yuepiao\_spider"
start_urls = [
"https://www.qidian.com/rank/yuepiao?month=01&chn=-1&page=1"
]

def parse(self, response):
fictions = response.xpath('//div[@id="rank-view-list"]/div/ul/li')
for fiction in fictions:
name = fiction.xpath('div[@class="book-mid-info"]/h4/a/text()').extract_first()
author = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[1]/text()').extract_first()
fiction_type = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[1]/text()').extract_first()
# 注意一定要是列表,不然会报错
image_url = ['http:{}'.format(fiction.xpath('div[@class="book-img-box"]/a/img/@src').extract()[0])]
brief_introduction = fiction.xpath('div[@class="book-mid-info"]/p[@class="intro"]/text()').extract_first()
state = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[2]/text()').extract()[0]
book_url = fiction.xpath('div[@class="book-mid-info"]/h4/a/@href').extract()[0]

item = QidianYuepiaoItem()
item['name'] = name
item['author'] = author
item['fiction\_type'] = fiction_type
item['brief\_introduction'] = brief_introduction.strip()
item['image\_url'] = image_url
item['state'] = state
item['book\_url'] = book_url

yield item

# 提取月份数,同时也要提取请求的url
url = response.url
regex = "https://(.\*)\?month=(.\*?)&(.\*)"
pattern = re.compile(regex)
m = pattern.match(url)
if not m:
return []
prefix = m.group(1)
month = int(m.group(2))
suffix = m.group(3)

# 大于7月份则停止,目前是2020年7月20日
if month > 7:
return

# 一定要将月份转成01, 02, s03这样的形式,否则不能正确请求到数据
next_month = num_to_str(month + 1)

next_url = f"https://{prefix}?month={next\_month}&{suffix}"
yield Request(next_url)

最后到了我们本节课的重点。首先我想要将数据保存成 json 格式,存储到文本文件中,但是在保存之前,需要对作品去重。因为有些作品会连续好几个月出现在月票榜的前20位置上,会有比较多重复。我们通过作品的 url 地址来唯一确定该小说。因此需要定义两个 Item Pipeline:

import json

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem


class QidianYuepiaoPipeline:
"""
保存不重复的数据到文本中
"""
def open\_spider(self, spider):
self.file = open("yuepiao\_top.json", 'w+')

def close\_spider(self, spider):
self.file.close()

def process\_item(self, item, spider):
data = json.dumps(dict(item), ensure_ascii=False)
self.file.write(f"{data}\n")
return item


class DuplicatePipeline:
"""
去除重复的数据,重复数据直接抛出异常,不会进入下一个流水线处理
"""
def \_\_init\_\_(self):
self.book_url_set = set()

def process\_item(self, item, spider):
if item['book\_url'] in self.book_url_set:
raise DropItem('duplicate fiction, drop it')
self.book_url_set.add(item['book\_url'])
return item

我来简单介绍下上面实现的两个 pipelines 的代码。首先爬虫抓取的 item 结果经过的是 DuplicatePipeline 这个管道 (我们通过管道的优先级控制),我们在 DuplicatePipeline 中定义了一个全局的集合 (set),在 管道的核心方法process_item() 中,我们先判断传过来的 item 中 book_url 的值是否存在,如果存在则判定重复,然后抛出异常,这样下一个管道 (即 QidianYuepiaoPipeline) 就不会去处理;

在经过的第二个管道 (QidianYuepiaoPipeline) 中,我们主要是将不重复 item 保存到本地文件中,因此我们会在 open_spider() 方法中打开文件句柄,在 close_spider() 方法中关闭文件句柄,而在 process_item() 中将 item 数据保存到指定的文件中。

接着就是将这两个 Pipelines 加到 settings.py 中:

ITEM_PIPELINES = {
'qidian\_yuepiao.pipelines.DuplicatePipeline': 200,
'qidian\_yuepiao.pipelines.QidianYuepiaoPipeline': 300,
}

最后,我们来介绍一个 Scrapy 内置的图片管道,其实现的 Pipeline 代码位置为:scrapy/pipelines/images.py,对应的还有一个内置的文件管道。我们不需要编写任何代码,只需要在 settings.py 中指定下载的图片字段即可:

# 下载图片存储位置
IMAGES_STORE = '/root/scrapy-test/qidian\_yuepiao/qidian\_yuepiao/images'
# 保存下载图片url地址的字段
IMAGES_URLS_FIELD = 'image\_url'
# 图片保存地址字段
IMAGES_RESULT_FIELD = 'images'
IMAGES_THUMBS = {
'small': (102, 136),
'big': (150, 200)
}

# ...

ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 1,
'qidian\_yuepiao.pipelines.DuplicatePipeline': 200,
'qidian\_yuepiao.pipelines.QidianYuepiaoPipeline': 300,
}

由于 ImagesPipeline 继承自 FilesPipeline,我们可以从官网的介绍中知道该图片下载功能的执行流程如下:

  • 在 spider 中爬取需要下载的图片链接,将其放入 item 的 image_url 字段中;
  • spider 将得到的 item 传送到 pipeline 进行处理;
  • 当 item 到达 Image Pipeline 处理时,它会检测是否有 image_url 字段,如果存在的话,会将该 url 传递给 scrapy 调度器和下载器;
  • 下载完成后会将结果写入 item 的另一个字段 images,images 包含了图片的本地路径、图片校验、以及图片的url;

完成了以上四步之后,我们的这样一个简单需求就算完成了。还等什么,快来运行看看!以下是视频演示: