跳到主要内容

深入分析 crawl 命令的执行过程

今天我们来跟踪学习 scrapy crawl spider_name 命令的执行过程,从这个过程中我们将看到 Scrapy 的引擎模块的作用。它是整个 Scrapy 其他模块共同的沟通主体,在 Scrapy 中处于核心模块的地位,可以说 Scrapy 的引擎模块就是它的大脑。本小节就让我们一起来探索 Scrapy “大脑” 的奥秘吧 !

1. Crawler 类及其相关类

我们先简单介绍下 Scrapy 中几个常用的基础类:Crawler、CrawlerRunner、CrawlerProcess。这些是我们分析 Scrapy 源码的基础知识点。

1.1 Crawler 类

# 源码位置:scrapy/crawler.py
# ...

class Crawler:

def \_\_init\_\_(self, spidercls, settings=None):
if isinstance(spidercls, Spider):
raise ValueError('The spidercls argument must be a class, not an object')

if isinstance(settings, dict) or settings is None:
settings = Settings(settings)

self.spidercls = spidercls
self.settings = settings.copy()
self.spidercls.update_settings(self.settings)

# 初始化一些属性
# ...

# 日志格式化类
lf_cls = load_object(self.settings['LOG\_FORMATTER'])
self.logformatter = lf_cls.from_crawler(self)
# 扩展管理类
self.extensions = ExtensionManager.from_crawler(self)

self.settings.freeze()
self.crawling = False
# 爬虫模块
self.spider = None
# 引擎模块
self.engine = None

@defer.inlineCallbacks
def crawl(self, \*args, \*\*kwargs):
if self.crawling:
raise RuntimeError("Crawling already taking place")
self.crawling = True

try:
self.spider = self._create_spider(\*args, \*\*kwargs)
self.engine = self._create_engine()
# 获取初始下载请求
start_requests = iter(self.spider.start_requests())
# 调用引擎的open\_spider()方法
yield self.engine.open_spider(self.spider, start_requests)
# 生成一个Deferred对象
yield defer.maybeDeferred(self.engine.start)
except Exception:
self.crawling = False
if self.engine is not None:
yield self.engine.close()
raise

def \_create\_spider(self, \*args, \*\*kwargs):
# 实例化Spider类
return self.spidercls.from_crawler(self, \*args, \*\*kwargs)

def \_create\_engine(self):
# 生成引擎实例
return ExecutionEngine(self, lambda _: self.stop())

@defer.inlineCallbacks
def stop(self):
"""Starts a graceful stop of the crawler and returns a deferred that is
fired when the crawler is stopped."""
if self.crawling:
self.crawling = False
yield defer.maybeDeferred(self.engine.stop)

Crawler 类的属性比较丰富,方法较少,最核心的就是这个 crawl() 方法了。可以看到它其实就完成了以下几个动作:

  • 获取初始的 Request 请求;
  • 调用引擎的 open_spider() 方法;
  • 将引擎的 start() 方法添加到 Deferred 对象中;

1.2 CrawlerRunner

该类有比较详细的注释,源码里对它的介绍如下:

这是一个简便的辅助类,用于跟踪、管理和运行已设置中的爬虫程序。

我们先来看该类的实例化方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
# ...

@staticmethod
def \_get\_spider\_loader(settings):
""" Get SpiderLoader instance from settings """
cls_path = settings.get('SPIDER\_LOADER\_CLASS')
# 导入spider loader类
loader_cls = load_object(cls_path)
excs = (DoesNotImplement, MultipleInvalid) if MultipleInvalid else DoesNotImplement
try:
verifyClass(ISpiderLoader, loader_cls)
except excs:
# 打印异常信息
# ...
return loader_cls.from_settings(settings.frozencopy())

def \_\_init\_\_(self, settings=None):
# scrapy全局配置
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
# 根据爬虫名,加载相应的Spdier类
self.spider_loader = self._get_spider_loader(settings)
# 用于保存所有的Crawler对象
self._crawlers = set()
# self.\_active用于保存调用Crawler.crawl()方法,返回的defer对象
self._active = set()
self.bootstrap_failed = False
self._handle_twisted_reactor()

注意比较重要的属性值有:

  • self.settings:保存了全局的配置信息;
  • self.spider_loader:用于根据相应的爬虫名加载对应的 Spider 类;
  • self._crawlers:用于保存所有的 Crawler 对象,使用的是 python 中的集合类型;

接下来我们看看该类中几个重要方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
# ...

@property
def spiders(self):
# 打印告警信息
# ...
return self.spider_loader

def crawl(self, crawler_or_spidercls, \*args, \*\*kwargs):
if isinstance(crawler_or_spidercls, Spider):
# 抛出异常,不能是Spider对象,只能是Spider类或者Crawler对象
# ...

crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, \*args, \*\*kwargs)

def \_crawl(self, crawler, \*args, \*\*kwargs):
self.crawlers.add(crawler)
# 调用crawl()方法生成一个Deferred对象
d = crawler.crawl(\*args, \*\*kwargs)
# 加入活跃爬虫的集合
self._active.add(d)

def \_done(result):
# 执行完后的收尾工作
self.crawlers.discard(crawler)
self._active.discard(d)
self.bootstrap_failed |= not getattr(crawler, 'spider', None)
return result
# 将\_done()方法加入成功和失败的回调链中
return d.addBoth(_done)

def create\_crawler(self, crawler_or_spidercls):
if isinstance(crawler_or_spidercls, Spider):
# 抛出异常,不能是Spider对象,只能是Spider类或者Crawler对象
# ...
if isinstance(crawler_or_spidercls, Crawler):
# 如果是Crawler实例
return crawler_or_spidercls
# 否则创建Crawler实例
return self._create_crawler(crawler_or_spidercls)

def \_create\_crawler(self, spidercls):
if isinstance(spidercls, str):
spidercls = self.spider_loader.load(spidercls)
# 返回Crawler实例
return Crawler(spidercls, self.settings)

上面的代码量都比较少,比较容易理解。不用纠结代码的细节,把握代码的作用即可,对于 Scrapy 框架有一个全貌即可。来简单描述下上面的方法:

  • crawl():爬虫开始执行爬取动作,crawl 命令调用的入口方法;
  • create_crawler():创建一个 Crawler 实例;

在看看这个类剩余的几个方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
# ...

def stop(self):
return defer.DeferredList([c.stop() for c in list(self.crawlers)])

@defer.inlineCallbacks
def join(self):
while self._active:
yield defer.DeferredList(self._active)

def \_handle\_twisted\_reactor(self):
if self.settings.get("TWISTED\_REACTOR"):
verify_installed_reactor(self.settings["TWISTED\_REACTOR"])

来简单说明下上面几个方法:

  • stop()join() 方法都是停止或者同步管理的爬虫,返回一个 Deferred 对象;
  • _handle_twisted_reactor:默认配置中 TWISTED_REACTOR 的值为 None。其校验安装的 reactor 代码也比较简单,就是导入 TWISTED_REACTOR 路径对应的类,然后判断 twisted 中的 reactor 是否为该类的一个实例,不是就抛出异常:
# 源码位置:scrapy/utils/reactor.py
# ...

def verify\_installed\_reactor(reactor_path):
from twisted.internet import reactor
# 导入reactor\_path位置的类
reactor_class = load_object(reactor_path)
# 判断twisted中的reactor是否为其实例
if not isinstance(reactor, reactor_class):
# 抛出异常
# ...

到此,这个类的相关属性及方法就介绍完毕了,接下来我们学习它的一个子类:CrawlerProcess

1.3 CrawlerProcess

该类用于在一个进程中同时运行多个 Scrapy 爬虫,它继承自上面的 CrawlerRunner 类。先学习其 __init__() 方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerProcess(CrawlerRunner):
def \_\_init\_\_(self, settings=None, install_root_handler=True):
# 调用父类的\_\_init\_\_()方法
super(CrawlerProcess, self).__init__(settings)
# 给一些信号设置相应的回调方法
install_shutdown_handlers(self._signal_shutdown)
# 初始化scrapy中默认的日志配置
configure_logging(self.settings, install_root_handler)
# 打印一些基本信息
log_scrapy_info(self.settings)

def \_signal\_shutdown(self, signum, _):
from twisted.internet import reactor
# 注册信号回调方法
install_shutdown_handlers(self._signal_kill)
signame = signal_names[signum]
# ...
reactor.callFromThread(self._graceful_stop_reactor)

def \_signal\_kill(self, signum, _):
from twisted.internet import reactor
install_shutdown_handlers(signal.SIG_IGN)
signame = signal_names[signum]
logger.info('Received %(signame)s twice, forcing unclean shutdown',
{'signame': signame})
reactor.callFromThread(self._stop_reactor)

# ...

可以看到 CrawlerProcess 类对象的初始化方法中先是调用父类的 __init__() 初始化相关属性,接着注册信号的回调方法,主要是终止信号。我们来看看这个 install_shutdown_handlers() 方法的代码:

# 源码位置:scrapy/utils/ossignal.py
# ...

def install\_shutdown\_handlers(function, override_sigint=True):
"""Install the given function as a signal handler for all common shutdown
signals (such as SIGINT, SIGTERM, etc). If override\_sigint is ``False`` the
SIGINT handler won't be install if there is already a handler in place
(e.g. Pdb)
"""
from twisted.internet import reactor
reactor._handleSignals()
# 注册SIGTERM信号的回调
signal.signal(signal.SIGTERM, function)
if signal.getsignal(signal.SIGINT) == signal.default_int_handler or \
override_sigint:
signal.signal(signal.SIGINT, function)
# Catch Ctrl-Break in windows
if hasattr(signal, 'SIGBREAK'):
# 注册SIGBREAK信号的回调
signal.signal(signal.SIGBREAK, function)

可以看到,上面的代码就是一些注册信号的回调方法。接下来我们来看 CrawlerRunner 类中比较重要的几个方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerProcess(CrawlerRunner):
# ...

def start(self, stop_after_crawl=True):
from twisted.internet import reactor
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(self._stop_reactor)

resolver_class = load_object(self.settings["DNS\_RESOLVER"])
resolver = create_instance(resolver_class, self.settings, self, reactor=reactor)
resolver.install_on_reactor()
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR\_THREADPOOL\_MAXSIZE'))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run(installSignalHandlers=False) # blocking call

def \_graceful\_stop\_reactor(self):
# 先调用stop()方法停止爬虫进程
d = self.stop()
# 最后回调停止reactor的方法
d.addBoth(self._stop_reactor)
return d

def \_stop\_reactor(self, _=None):
from twisted.internet import reactor
try:
reactor.stop()
except RuntimeError: # raised if already stopped or in shutdown stage
pass

# ...


来看这个 start() 方法,其中比较重要的就是调用了 reactor.run() 方法。_graceful_stop_reactor()_stop_reactor() 从名字上就可以看出它们是关于停止 reactor 的方法。

2. crawl 命令追踪

现在我们要追踪前面多次使用的 scrapy crawl xxxx 命令的执行过程。根据前面的学习成果,我们知道 crawl 命令会调用 scrapy/commands/crawl.py 文件中 Command 类的 run() 方法:

# 源码位置:scrapy/commands/crawl.py
# ...

class Command(BaseRunSpiderCommand):
# ...

def run(self, args, opts):
# 没带参数或者带多了参数,抛出异常
# ...

# 获取爬虫名
spname = args[0]

# 核心是调用self.crawler\_process执行爬取动作
crawl_defer = self.crawler_process.crawl(spname, \*\*opts.spargs)

if getattr(crawl_defer, 'result', None) is not None and issubclass(crawl_defer.result.type, Exception):
# 如果结果异常或者没有结果,设置退出码为1
self.exitcode = 1
else:
# 调用self.crawler\_process的start()方法
self.crawler_process.start()

if self.crawler_process.bootstrap_failed or \
(hasattr(self.crawler_process, 'has\_exception') and self.crawler_process.has_exception):
self.exitcode = 1

首先我们从早先的调用代码中可以知道 self.crawler_process 其实是 CrawlerProcess 类的一个实例:

# 源码位置:scrapy/cmdline.py
# ...

def execute(argv=None, settings=None):
# ...

# 核心,设置command类的核心处理类
cmd.crawler_process = CrawlerProcess(settings)

# ...

这样命令的核心执行方法其实就是调用 CrawlerProcess 对象的 crawl() 方法。我们从一小节的学习中可知:CrawlerProcess 对象的 crawl() 方法其实是父类 (CrawlerRunner) 中的 crawl() 方法,且其调用的核心就两行语句:

crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, \*args, \*\*kwargs)

这个 crawler 是 Crawler 类的一个实例,最后在调用 self._crawl() 方法时,通过代码可知该方法其实是调用 Crawler 对象的 crawl() 方法开始执行 Scrapy 爬虫程序:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
# ...

def \_crawl(self, crawler, \*args, \*\*kwargs):
# ...
d = crawler.crawl(\*args, \*\*kwargs)
# ...

到此,我们知道 crawl 命令执行的核心方法其实是 Crawler 对象中的 crawl() 方法。 接下来,我们就是要从这个 crawl() 方法入手。

前面我们已经介绍过 crawl() 方法,该方法会先根据传入的爬虫名来创建 Spider 实例以及 Engine 实例:

self.spider = self._create_spider(\*args, \*\*kwargs)
self.engine = self._create_engine()

接着是得到起始的爬取 URL:

start_requests = iter(self.spider.start_requests())

这部分调用的正是 Spider 对象的 start_requests()方法获取初始的 URLs,我们也可以在自定义的爬虫中通过重写该方法来获取自定义的初始 URLs。

在接下来就是两个 yield 语句,这两句也是核心

yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)

这样子,我们的跟踪目标就要转入到 Scrapy 源码的核心目录下了,其调用的是引擎模块中的两个方法,位于我们之前接触的 engine.py 文件中。来分别看着两个方法的代码:

# 源码位置: scrapy/core/engine.py
# ...

class ExecutionEngine:
# ...

@defer.inlineCallbacks
def open\_spider(self, spider, start_requests=(), close_if_idle=True):
if not self.has_capacity():
raise RuntimeError("No free spider slot when opening %r" % spider.name)
logger.info("Spider opened", extra={'spider': spider})
# 获取CallLaterOnce的实例,延迟调用一次
nextcall = CallLaterOnce(self._next_request, spider)
# 获取调度器
scheduler = self.scheduler_cls.from_crawler(self.crawler)
# 通过Spider中间件后,获取初始的urls,对应着数据流图的1
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
# 打开调度器
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
# 统计用的
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
# 延迟调用self.\_next\_request()方法,内部使用reactor.callLater()方法实现
slot.nextcall.schedule()
slot.heartbeat.start(5)

看上面的代码,在 open_spider() 方法中我们可以看到,在我们调用的 Spider 对象中产生的起始请求在经过 Spider 中间件处理后再给回引擎模块;接着我们还会打开调度器以及爬取开关 (self.scraper.open_spider()) 和统计开关 (self.crawler.stats.open_spider())。

紧接着就是内部使用 reactor.callLater() 方法实现对 self._next_request() 方法的延迟调用:

# 源码位置:scrapy/utils/reactor.py
# ...

class CallLaterOnce:
"""Schedule a function to be called in the next reactor loop, but only if
it hasn't been already scheduled since the last time it ran.
"""

def \_\_init\_\_(self, func, \*a, \*\*kw):
self._func = func
self._a = a
self._kw = kw
self._call = None

def schedule(self, delay=0):
'''设置延迟调用'''
from twisted.internet import reactor
if self._call is None:
self._call = reactor.callLater(delay, self)

def cancel(self):
if self._call:
self._call.cancel()

def \_\_call\_\_(self):
# 在对象调用后,self.\_call设置为空,并调用方法
self._call = None
return self._func(\*self._a, \*\*self._kw)

# ...

看完上面的代码后,我们可知接下来就要进入 self._next_request() 方法中执行。继续来看该方法的代码:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
# ...

def \_next\_request(self, spider):
slot = self.slot
# 不正确的情况,直接return
# ...

while not self._needs_backout(spider):
"""爬虫不需要返回,然后会从调度器获取下一个请求"""
if not self._next_request_from_scheduler(spider):
break

if slot.start_requests and not self._needs_backout(spider):
try:
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
self.crawl(request, spider)

if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)

注意:一开始的调度器中并没有请求的 URLs,因此 _next_request() 方法每次调用时会从 start_requests 中获取请求,然后调用 self.crawl() 方法。我们来看看具体的 self.crawl() 方法的代码:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
# ...

def crawl(self, request, spider):
if spider not in self.open_spiders:
raise RuntimeError("Spider %r not opened when crawling: %s" % (spider.name, request))
# 将请求加入调度器队列
self.schedule(request, spider)
# 继续调用self.\_next\_request()方法
self.slot.nextcall.schedule()

def schedule(self, request, spider):
self.signals.send_catch_log(signals.request_scheduled, request=request, spider=spider)
# 这里就是简单将请求压入调度
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signals.request_dropped, request=request, spider=spider)

这个代码就非常清楚了,self.crawl() 方法并不直接下载对应 url 的网页,而是将其推入调度器的下载队列,然后再调回 self._next_request() 方法 (对应着就是 self.slot.nextcall.schedule() 这一句)。而真正执行下载任务的代码为:

while not self._needs_backout(spider):
"""爬虫不需要返回,然后会从调度器获取下一个请求"""
if not self._next_request_from_scheduler(spider):
break

这段代码会循环从调度器中获取下一个请求,如果没有则跳出循环往下执行。如果有请求,则会执行 self._download() 方法去下载对应的网页,而这个方法正是在第24节中介绍过的下载网页的入口。

另外,我们还剩下一个 yield 没介绍,就是引擎的启动:yield defer.maybeDeferred(self.engine.start)。该方法比较简单,就是记录下起始时间以及设置 self.running 标志:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
# ...

@defer.inlineCallbacks
def start(self):
"""Start the execution engine"""
if self.running:
raise RuntimeError("Engine already running")
# 记录引擎开始执行时间
self.start_time = time()
# 发送引擎执行信号
yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
# 设置running标识
self.running = True
self._closewait = defer.Deferred()
yield self._closewait

来总结下 crawl 命令的执行流程:

图片描述

crawl命令的执行流程 对于 crawl 命令的追踪到这里就结束了。当然,这里我们还有一些内容没有介绍到,比如调度器如何实现请求的入队、出队等,再比如 Spider 模块和 Engine 模块的交互过程等,这些就留给读者课后继续探索了。

3. 小结

本小节中我们主要介绍了 scrapy/crawler.py 文件中的代码以及追踪了 scrapy crawl xxx 命令的执行过程。通过追踪这个命令,我们可以了解 Scrapy 爬虫的整个运行流程,回过头来在看 Scrapy 的架构图以及数据流图,是不是又多了一份理解与认识?当然,对于 Scrapy 框架源码我们还有许多代码没有介绍到,比如 scrapy 的扩展模块 (scrapy/extensions)、链接抽取模块 (scrapy/linkextractors)。这些部分并不属于 Scrapy 的核心部分,也不是学习 Scrapy 的重点,读者有兴趣可以课后认真研究这些模块的代码,以增强对 Scrapy 框架的认识。好了,整个 Scrapy 框架的的介绍到此就结束了,我们青山不改,江湖再会!