source

Executor Service, 모든 작업이 완료될 때까지 기다리는 방법

goodcode 2022. 8. 3. 23:13
반응형

Executor Service, 모든 작업이 완료될 때까지 기다리는 방법

의 모든 입니까?ExecutorService끝내기 위해?제 태스크는 주로 계산 작업이기 때문에 각 코어에 하나씩 많은 작업을 실행하고 싶습니다..

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask실행 가능한 도구입니다.올바르게 .wait()IllegalMonitorStateException이상한데요, 장난감 예제를 가지고 놀았더니 효과가 있는 것 같았거든요.

uniquePhrases에는 수만 개의 요소가 포함되어 있습니다.른?한 한 것을 있다.

가장 간단한 방법은 원라이너에서 원하는 것을 사용하는 것입니다.용어로 말하면, 수정 또는 랩을 해야 합니다.ComputeDTaskCallable<>유연성이 향상됩니다. 여러분의 입니다.Callable.call()를 사용하지 않는 경우는, 다음과 같이 랩 합니다.

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

할 수 .invokeAll()에서는, 「 입니다.answers의 is is a가 되어 있습니다.Futures(null의 )Executors.callable()은 도움이 수 을 하는 것, 가 되는 것에 를 하는 ComputeDTask을 사용하다

하지 않은 에는 '', '', '아까운', '아까운', '아까운', '아까운invokeAll() 모두)Future는 s, s는 s, s는 s입니다.answers합니다..isDone() ( ) 하면 셧다운이 됩니다.기다려주세요. 기다리십시오.이것을 다시 사용할 수 .ExecutorService여러 사이클 동안 깔끔하게 사용할 수 있습니다.

SO에 관한 몇 가지 질문이 있습니다.

중 어느 것도은 아니지만, 알수 있습니다.Executor/ExecutorService사용해야 합니다.

모든 작업이 완료될 때까지 대기하려면 다음 방법을 사용합니다.wait그럼, 을 참조해 주세요.

또한 스레드 풀을 올바르게 초기화할 수 있도록 를 사용하여 하드웨어 스레드 수를 가져올 수 있습니다.

의 모든 하고 있는 경우ExecutorService「완료」는, 정확하게는 목표가 아니고, 특정의 태스크의 배치가 완료될 때까지 기다립니다.구체적으로는 를 사용할 수 있습니다.

는 '만들기'를 입니다.ExecutorCompletionServiceExecutor, 를 통해 이미 알려진 몇 가지 작업을 전송합니다.CompletionService그런 다음 (어느 블록) 또는 (어느 블록)을 사용하여 완료 큐에서 같은 수의 결과를 도출합니다.제출한 작업에 대한 예상 결과를 모두 그리면 작업이 모두 완료된 것입니다.

인터페이스에서는 명확하지 않기 때문에 다시 한 번 설명하겠습니다.개 돼요.CompletionService얼마나 많은 것을 끌어내야 하는지 알기 위해서요. 「 」, 「 」, 「 」, 「 R&D 」에 합니다.take() 메서드로 됩니다.CompletionService.

Java Concurrency in Practice(Java 동시 실행 방식)」에는, 사용 방법의 몇개의 예가 있습니다.

는, 「」를 호출해 .shutdown()그리고 나서, 기다려라종료(유닛, unitType). 예:awaitTermination(1, MINUTE). Executor Service를 수 waitsyslog.

특정 간격으로 작업이 완료될 때까지 기다릴 수 있습니다.

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

또는 Executor Service를 사용할 수도 있습니다.submit(Runnable) 및 반환되는 Future 객체를 수집하여 각각 get()을 호출하여 그것들이 완료될 때까지 기다립니다.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

중단됨예외를 적절하게 처리하는 것은 매우 중요합니다.이를 통해 사용자 또는 라이브러리 사용자가 긴 프로세스를 안전하게 종료할 수 있습니다.

그냥 사용하다

latch = new CountDownLatch(noThreads)

각 스레드에

latch.countDown();

그리고 장벽으로서

latch.await();

Ilgal Monitor State Exception의 근본 원인:

스레드가 개체의 모니터에서 대기하려고 시도했거나 지정된 모니터를 소유하지 않고 개체의 모니터에서 대기 중인 다른 스레드에 알리기 위해 느려집니다.

코드를 통해 잠금을 소유하지 않고 Executor Service에서 wait()를 호출했습니다.

는 래 below래 、 아 、 아 、 below 。IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

된 모든 작업이까지 기다리려면 방법 중 .ExecutorService

  1. 것을 Future작업submitExecutorService블로킹 콜을 사용하여 상태를 확인합니다.get()Future물건

  2. invokeAll을 사용하는 경우ExecutorService

  3. Count Down Latch 사용

  4. ForkJoinPool 또는 newWorkStealingPool 사용:Executors(Java 8 이후)

  5. Oracle 설명서 페이지의 권장 사항에 따라 풀 종료

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    옵션 1 ~4 대신 옵션5를 사용할 때 모든 작업이 완료될 때까지 대기하려면 변경하십시오.

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    로.

    a while(condition)1분마다 체크합니다.

사용할 수 있습니다.ExecutorService.invokeAll메서드: 모든 작업을 실행하고 모든 스레드가 작업을 완료할 때까지 기다립니다.

여기 완전한 javadoc이 있습니다.

이 메서드의 오버로드 버전을 사용하여 타임아웃을 지정할 수도 있습니다.

다음은 다음 샘플 코드입니다.ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

저도 서류 세트를 캐내야 할 상황이 있습니다.먼저 처리해야 할 초기 "시드" 문서부터 시작합니다. 이 문서에는 처리해야 할 다른 문서로의 링크가 포함되어 있습니다.

제 메인 프로그램에서는, 다음과 같은 것을 쓰고 싶습니다.Crawler여러 개의 스레드를 제어합니다.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

트리를 탐색하는 경우에도 같은 상황이 발생합니다.루트 노드에 팝업을 하면 각 노드의 프로세서가 필요에 따라 큐에 하위 노드를 추가하고 스레드 집합이 트리 내의 모든 노드를 처리하여 더 이상 존재하지 않게 됩니다.

저는 JVM에서 조금 놀라운 것을 찾을 수 없었습니다.그래서 나는 수업을 썼다.ThreadPool직접 또는 서브클래스를 사용하여 도메인에 적합한 메서드를 추가할 수 있습니다.schedule(Document)도움이 됐으면 좋겠네요!

스레드풀 자바독|메이븐

컬렉션에 모든 스레드를 추가하고 다음을 사용하여 제출합니다.invokeAll를 사용할 수 있는 경우invokeAll의 방법ExecutorService모든 스레드가 완료될 때까지 JVM은 다음 행으로 진행되지 않습니다.

여기 좋은 예가 있습니다.invoke All via Executor Service

태스크를 Runner로 전송하고 다음과 같이 메서드 waitTillDone() 호출을 기다립니다.

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

사용하려면 다음 그래들/매븐 종속성을 추가합니다.'com.github.matejtymes:javafixes:1.0'

상세한 것에 대하여는, https://github.com/MatejTymes/JavaFixes 또는 http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html 를 참조해 주세요.

몇 가지 방법이 있습니다.

번째 Executor Service를 호출할 수 있습니다.shutdownExecutorService.awaitTermination을 실행하면 다음 값이 반환됩니다.

이 실행자가 종료되면 true, 종료 전에 타임아웃이 경과하면 false

그래서:

대기라는 기능이 있습니다.종료 단, 타임아웃을 지정해야 합니다.이것이 반환될 때 모든 작업이 완료되었다는 보장은 아닙니다.이것을 달성할 수 있는 방법이 있나요?

만 요.awaitTermination고리를 틀어서.

wait 사용 중종료

이 실장의 완전한 예를 다음에 나타냅니다.

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

Count Down Latch 사용

또 다른 옵션은 CountDownLatch를 생성하여count이치노는 각각스 each each each each 、 each 、 each 、 each 、 each 。countDownLatch.countDown();메인 스레드가 호출하는 동안countDownLatch.await();.

이 실장의 완전한 예를 다음에 나타냅니다.

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

순회 장벽 사용

다른 접근법은 순환 장벽을 사용하는 것이다.

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

다른 접근방식이 있지만, 이러한 접근방식은 초기 요건을 변경해야 합니다.즉, 다음과 같습니다.

모든 작업이 Executor Service를 사용하여 전송될 때 완료될 때까지 기다리는 방법.execute() 입니다.

작업 완료에 적합하다고 생각되는 지정된 타임아웃으로 실행자가 종료될 때까지 기다립니다.

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

글로벌 풀을 사용하여 작업을 실행해야 할 것 같습니다.

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

는 에 pool.awaitQuiescence이 메서드는 호출자의 스레드를 사용하여 태스크를 실행한 후 실제로 비어 있을 때 되돌아오는 것을 차단합니다.

이건 어때?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

새로운 작업을 Executor Service에 추가하지 않으면 현재 작업이 모두 완료될 때까지 대기할 수 있습니다.

이에 대한 간단한 대안은 결합과 함께 스레드를 사용하는 것입니다.참조: 스레드 결합

언급URL : https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish

반응형