In Java how to shutdown executorservice when it may submit additional tasks to itself -
i have pipeline of tasks (each task in pipeline has different parallelism requirements), each task works in different executorservice. tasks work on packets of data, if have 10 datapackets 10 tasks submitted service1
, 1 task per data packet. once task submitted service1
has invoked may submit new task work further on datapacket service2
, service3
or not.
the following code works fine, i.e.:
shutdown()
invoked onservice1
after has been submittedservice1
then awaittermination() not return until tasks submitted before shutdown() have completed running. --
shutdown()
invoked onservice2
because tasks submittedservice1
have completed, , tasks submittedservice2
tasks onservice1
tasks have been submittedservice2
beforeshutdown()
called onservice2
. -- , onservice3
executorservice[] services = { service1, service2, service3}; int count = 0; for(executorservice service: services) { service.shutdown(); service.awaittermination(1, timeunit.hours); }
however have added case whereby service2
can break datapacket smaller packet , submit additional tasks on service2
, code failing. problem shutdown()
called on service2
once tasks on service1
have completed, want submit additional service2
tasks task running in service2
my questions:
- does
shutdown()
rerun after submitted tasks have finished running, or return doesn't stop submitted tasks running ? update:answered below - how solve new problem ?
"shutdown" tells pool not accept more work. nothing more. existing submitted work executed normal. when queue drained, pool destroy it's threads , terminate.
the problem here you're saying tasks in service2 submit additional tasks service2 processing. there seems no way know when should call shutdown. alas, there alternative, assuming these smaller packets don't break down further service.
list<future<void>> service2futures = new arraylist<future<void>>(); service2futures.add(service2.submit(new callable<void>() { public void call() throws exception { // work, submit more stuff service2 // if submit callables, use future.get() wait on // results. return null; } })); (future<void> future : service2futures) { future.get(); } service2.shutdown(); ...
what's going on here you're storing future objects top level submitted tasks (you'll have use callable , not runnable). instead of shutting pool down after submission, collect future objects. wait until done running cycling through them, , calling get() on each one. "get()" method blocks until thread running task has completed.
at point, of top level tasks complete, , have submitted second level tasks. can issue shutdown. assumes second level tasks don't submit more stuff service2.
this being said, if you're using java 7, should consider taking @ forkjoinpool , recursivetask instead. makes more sense you're doing.
forkjoinpool forkjoinpool = new forkjoinpool(); recursiveaction action = new recursiveaction() { protected void compute() { // break down here , build actions recursiveaction smalleractions[] = ...; invokeall(smalleractions); } }; future<void> future = forkjoinpool.submit(action);
Comments
Post a Comment