In Java how to shutdown executorservice when it may submit additional tasks to itself -


i have pipeline of tasks (each task in pipeline has different parallelism requirements), each task works in different executorservice. tasks work on packets of data, if have 10 datapackets 10 tasks submitted service1, 1 task per data packet. once task submitted service1 has invoked may submit new task work further on datapacket service2, service3 or not.

the following code works fine, i.e.:

  • shutdown() invoked on service1 after has been submitted service1
  • then awaittermination() not return until tasks submitted before shutdown() have completed running. -- shutdown() invoked on service2 because tasks submitted service1 have completed, , tasks submitted service2 tasks on service1 tasks have been submitted service2 before shutdown() called on service2. -- , on service3

    executorservice[] services = {         service1,         service2,         service3};  int count = 0; for(executorservice service: services) {        service.shutdown();     service.awaittermination(1, timeunit.hours); } 

however have added case whereby service2 can break datapacket smaller packet , submit additional tasks on service2 , code failing. problem shutdown() called on service2 once tasks on service1 have completed, want submit additional service2 tasks task running in service2

my questions:

  1. does shutdown() rerun after submitted tasks have finished running, or return doesn't stop submitted tasks running ? update:answered below
  2. how solve new problem ?

"shutdown" tells pool not accept more work. nothing more. existing submitted work executed normal. when queue drained, pool destroy it's threads , terminate.

the problem here you're saying tasks in service2 submit additional tasks service2 processing. there seems no way know when should call shutdown. alas, there alternative, assuming these smaller packets don't break down further service.

list<future<void>> service2futures = new arraylist<future<void>>();  service2futures.add(service2.submit(new callable<void>() {   public void call() throws exception {     // work, submit more stuff service2     // if submit callables, use future.get() wait on     // results.     return null;   } }));  (future<void> future : service2futures) {   future.get(); }  service2.shutdown(); ... 

what's going on here you're storing future objects top level submitted tasks (you'll have use callable , not runnable). instead of shutting pool down after submission, collect future objects. wait until done running cycling through them, , calling get() on each one. "get()" method blocks until thread running task has completed.

at point, of top level tasks complete, , have submitted second level tasks. can issue shutdown. assumes second level tasks don't submit more stuff service2.

this being said, if you're using java 7, should consider taking @ forkjoinpool , recursivetask instead. makes more sense you're doing.

forkjoinpool forkjoinpool = new forkjoinpool(); recursiveaction action = new recursiveaction() {     protected void compute() {          // break down here , build actions          recursiveaction smalleractions[] = ...;           invokeall(smalleractions);     } };  future<void> future = forkjoinpool.submit(action); 

Comments

Popular posts from this blog

django - How can I change user group without delete record -

java - Need to add SOAP security token -

java - EclipseLink JPA Object is not a known entity type -