电脑故障

位置:IT落伍者 >> 电脑故障 >> 浏览文章

任务列表分派给多个线程的策略和方法


发布日期:2021/3/20
 

多线程下载由来已久如 FlashGetNetAnts 等工具它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围)首先能读取出请求内容 (即欲下载的文件) 的大小划分出若干区块把区块分段分发给每个线程去下载线程从本段起始处下载数据及至段尾多个线程下载的内容最终会写入到同一个文件中

只研究有用的工作中的需求要把多个任务分派给多个线程去执行这其中就会有一个任务列表指派到线程的策略思考已知 一个待执行的任务列表 指定要启动的线程数;问题是每个线程实际要执行哪些任务

策略是任务列表连续按线程数分段先保证每线程平均能分配到的任务数余下的任务从前至后依次附加到线程中只是数量上实际每个线程执行的任务都还是连续的如果出现那种僧多(线程) 粥(任务) 少的情况实际启动的线程数就等于任务数一挑一这里只实现了每个线程各扫自家门前雪动作快的完成后眼见别的线程再累都是爱莫能助

实现及演示代码如下由三个类实现写在了一个 java 文件中TaskDistributor 为任务分发器Task 为待执行的任务WorkThread 为自定的工作线程代码中运用了命令模式如若能配以监听器用上观察者模式来控制 UI 显示就更绝妙不过了就能实现像下载中的区块着色跳跃的动感了在此定义下一步的着眼点了

代码中有较为详细的注释看这些注释和执行结果就很容易理解的main() 是测试方法

packagemon;

importjavautilArrayList;

importjavautilList;

/**

*指派任务列表给线程的分发器

*@authorUnmi

*QQ:Email:

*MSN:

*/

publicclassTaskDistributor{

/**

*测试方法

*@paramargs

*/

publicstaticvoidmain(String[]args){

//初始化要执行的任务列表

ListtaskList=newArrayList();

for(inti=;i<;i++){

taskListadd(newTask(i));

}

//设定要启动的工作线程数为

intthreadCount=;

List[]taskListPerThread=distributeTasks(taskListthreadCount);

Systemoutprintln(实际要启动的工作线程数+taskListPerThreadlength);

for(inti=;i<taskListPerThreadlength;i++){

ThreadworkThread=newWorkThread(taskListPerThread[i]i);

workThreadstart();

}

}

/**

*把List中的任务分配给每个线程先平均分配剩于的依次附加给前面的线程

*返回的数组有多少个元素(List)就表明将启动多少个工作线程

*@paramtaskList待分派的任务列表

*@paramthreadCount线程数

*@return列表的数组每个元素中存有该线程要执行的任务列表

*/

publicstaticList[]distributeTasks(ListtaskListintthreadCount){

//每个线程至少要执行的任务数假如不为零则表示每个线程都会分配到任务

intminTaskCount=taskListsize()/threadCount;

//平均分配后还剩下的任务数不为零则还有任务依个附加到前面的线程中

intremainTaskCount=taskListsize()%threadCount;

//实际要启动的线程数如果工作线程比任务还多

//自然只需要启动与任务相同个数的工作线程一对一的执行

//毕竟不打算实现了线程池所以用不着预先初始化好休眠的线程

intactualThreadCount=minTaskCount>?threadCount:remainTaskCount;

//要启动的线程数组以及每个线程要执行的任务列表

List[]taskListPerThread=newList[actualThreadCount];

inttaskIndex=;

//平均分配后多余任务每附加给一个线程后的剩余数重新声明与remainTaskCount

//相同的变量不然会在执行中改变remainTaskCount原有值产生麻烦

intremainIndces=remainTaskCount;

for(inti=;i<taskListPerThreadlength;i++){

taskListPerThread[i]=newArrayList();

//如果大于零线程要分配到基本的任务

if(minTaskCount>){

for(intj=taskIndex;j<minTaskCount+taskIndex;j++){

taskListPerThread[i]add(taskListget(j));

}

taskIndex+=minTaskCount;

}

//假如还有剩下的则补一个到这个线程中

if(remainIndces>){

taskListPerThread[i]add(taskListget(taskIndex++));

remainIndces;

}

}

//打印任务的分配情况

for(inti=;i<taskListPerThreadlength;i++){

Systemoutprintln(线程+i+的任务数+

taskListPerThread[i]size()+区间[

+taskListPerThread[i]get()getTaskId()+

+taskListPerThread[i]get(taskListPerThread[i]size())getTaskId()+]);

}

returntaskListPerThread;

}

}

/**

*要执行的任务可在执行时改变它的某个状态或调用它的某个操作

*例如任务有三个状态就绪运行完成默认为就绪态

*要进一步完善可为Task加上状态变迁的监听器因之决定UI的显示

*/

classTask{

publicstaticfinalintREADY=;

publicstaticfinalintRUNNING=;

publicstaticfinalintFINISHED=;

privateintstatus;

//声明一个任务的自有业务含义的变量用于标识任务

privateinttaskId;

//任务的初始化方法

publicTask(inttaskId){

thisstatus=READY;

thistaskId=taskId;

}

/**

*执行任务

*/

publicvoidexecute(){

//设置状态为运行中

setStatus(TaskRUNNING);

Systemoutprintln(当前线程ID是+ThreadcurrentThread()getName()

+|任务ID是+thistaskId);

//附加一个延时

try{

Threadsleep();

}catch(InterruptedExceptione){

eprintStackTrace();

}

//执行完成改状态为完成

setStatus(FINISHED);

}

publicvoidsetStatus(intstatus){

thisstatus=status;

}

publicintgetTaskId(){

returntaskId;

}

}

/**

*自定义的工作线程持有分派给它执行的任务列表

*/

classWorkThreadextendsThread{

//本线程待执行的任务列表你也可以指为任务索引的起始值

privateListtaskList=null;

privateintthreadId;

/**

*构造工作线程为其指派任务列表及命名线程ID

*@paramtaskList欲执行的任务列表

*@paramthreadId线程ID

*/

publicWorkThread(ListtaskListintthreadId){

thistaskList=taskList;

thisthreadId=threadId;

}

/**

*执行被指派的所有任务

*/

publicvoidrun(){

for(Tasktask:taskList){

taskexecute();

}

}

}

执行结果如下注意观察每个线程分配到的任务数量及区间直到所有的线程完成了所分配到的任务后程序结束

线程的任务数区间[]

线程的任务数区间[]

线程的任务数区间[]

线程的任务数区间[]

线程的任务数区间[]

实际要启动的工作线程数

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

当前线程ID是Thread|任务ID是

上面坦白来只算是基本功夫贴出来还真见笑了还有更为复杂的功能

像多线程的下载工具的确更充分利用了网络资源而且像 FlashGetNetAnts 都实现了假如某个线程下载完了欲先所分配段的内容之后会帮其他线程下载未完成数据直到任务完成;或某一下载线程的未完成段区间已经很小了用不着别人来帮忙时这就涉及到任务的进一步分配再如以上两个工具都能动态增加减小或中止线程越说越复杂了它们原本比这复杂多了这些实现可能定义各种队列来实现如未完成任务队列下载中任务队列和已完成队列难以细究了

上一篇:爪哇语言结构性模式之变压器模式介绍(上)

下一篇:基于JXTA的P2P应用开发