想要更有效更快速的抓取网页内容则必须采用多线程Heritrix中提供了一个标准的线程池ToePool它用于管理所有的抓取线程 ToePool和ToeThread都位于orgarchivecrawlerframework包中前面已经说过ToePool的初始化是在CrawlController的initialize()方法中完成的来看一下ToePool以及ToeThread是如何被初始化的以下代码是在CrawlController中用于对ToePool进行初始化的 构造函数 toePool = new ToePool(this); // 按orderxml中的配置实例化并启动线程 toePoolsetSize(ordergetMaxToes()); ToePool的构造函数很简单如下所示 public ToePool(CrawlController c) { super(ToeThreads); ntroller = c; } 它仅仅是调用了父类javalangThreadGroup的构造函数同时将注入的CrawlController赋给类变量这样便建立起了一个线程池的实例了但是那些真正的工作线程又是如何建立的呢? 下面来看一下线程池中的setSize(int)方法从名称上看这个方法很像是一个普通的赋值方法但实际上它并不是那么简单 public void setSize(int newsize) { targetSize = newsize; int difference = newsize getToeCount(); // 如果发现线程池中的实际线程数量小于应有的数量 // 则启动新的线程 if (difference > ) { for(int i = ; i <= difference; i++) { // 启动新线程 startNewThread(); } } // 如果线程池中的线程数量已经达到需要 else { int retainedToes = targetSize; // 将线程池中的线程管理起来放入数组中 Thread[] toes = thisgetToes(); // 循环去除多余的线程 for (int i = ; i < toeslength ; i++) { if(!(toes[i] instanceof ToeThread)) { continue; } retainedToes; if (retainedToes>=) { continue; } ToeThread tt = (ToeThread)toes[i]; ttretire(); } } } // 用于取得所有属于当前线程池的线程 private Thread[] getToes() { Thread[] toes = new Thread[activeCount()+]; // 由于ToePool继承自javalangThreadGroup类 // 因此当调用enumerate(Thread[] toes)方法时 // 实际上是将所有该ThreadGroup中开辟的线程放入 // toes这个数组中以备后面的管理 thisenumerate(toes); return toes; } // 开启一个新线程 private synchronized void startNewThread() { ToeThread newThread = new ToeThread(this nextSerialNumber++); newThreadsetPriority(DEFAULT_TOE_PRIORITY); newThreadstart(); } 通过上面的代码可以得出这样的结论线程池本身在创建的时候并没有任何活动的线程实例只有当它的setSize方法被调用时才有可能创建新线程如果当setSize方法被调用多次而传入不同的参数时线程池会根据参数里所设定的值的大小来决定池中所管理线程数量的增减 当线程被启动后所执行的是其run()方法中的片段接下来看一个ToeThread到底是如何处理从Frontier中获得的链接的 public void run() { String name = controllergetOrder()getCrawlOrderName(); loggerfine(getName()+ started for order +name+); try { while ( true ) { // 检查是否应该继续处理 continueCheck(); setStep(STEP_ABOUT_TO_GET_URI); // 使用Frontier的next方法从Frontier中 // 取出下一个要处理的链接 CrawlURI curi = controllergetFrontier()next(); // 同步当前线程 synchronized(this) { continueCheck(); setCurrentCuri(curi); } /* * 处理取出的链接 */ processCrawlUri(); setStep(STEP_ABOUT_TO_RETURN_URI); // 检查是否应该继续处理 continueCheck(); // 使用Frontier的finished()方法 // 来对刚才处理的链接做收尾工作 // 比如将分析得到的新的链接加入 // 到等待队列中去 synchronized(this) { controllergetFrontier()finished(currentCuri); setCurrentCuri(null); } // 后续的处理 setStep(STEP_FINISHING_PROCESS); lastFinishTime = SystemcurrentTimeMillis(); // 释放链接 controllerreleaseContinuePermission(); if(shouldRetire) { break; // from while(true) } } } catch (EndedException e) { } catch (Exception e) { loggerlog(LevelSEVEREFatal exception in +getName()e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controllerreleaseContinuePermission(); } setCurrentCuri(null); // 清理缓存数据 this(); thishttpRecorder = null; localProcessors = null; loggerfine(getName()+ finished for order +name+); setStep(STEP_FINISHED); controllertoeEnded(); controller = null; } 在上面的方法中很清楚的显示了工作线程是如何从Frontier中取得下一个待处理的链接然后对链接进行处理并调用Frontier的finished方法来收尾释放链接最后清理缓存终止单步工作等另外其中还有一些日志操作主要是为了记录每次抓取的各种状态 很显然以上代码中最重要的一行语句processCrawlUri()它是真正调用处理链来对链接进行处理的代码 |