zhangyue19921010 commented on PR #7582: URL: https://github.com/apache/hudi/pull/7582#issuecomment-1367801102
For example something like this ``` public class DisruptorExecutor<I, O, E> extends BaseHoodieQueueBasedExecutor<I, O, E> { public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> inputItr, HoodieConsumer<O, E> consumer, Function<I, O> transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) { this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable); } public DisruptorExecutor(final Option<Integer> bufferSize, List<HoodieProducer<I>> producers, Option<HoodieConsumer<O, E>> consumer, final Function<I, O> transformFunction, final Option<String> waitStrategy, Runnable preExecuteRunnable) { super(producers, consumer, new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable); } @Override protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, E> consumer) { // no-op // will do consuming actions in disruptor } @Override public void setup() { DisruptorMessageQueue<I, O> disruptorQueue = (DisruptorMessageQueue<I, O>) queue; // Before we start producing, we need to set up Disruptor's queue disruptorQueue.setHandlers(consumer.get()); disruptorQueue.start(); } } ``` and ``` public void setup(){} /** * Main API to run both production and consumption. */ @Override public E execute() { try { checkState(this.consumer.isPresent()); setup(); // Start consuming/producing asynchronously CompletableFuture<Void> consuming = startConsumingAsync(); CompletableFuture<Void> producing = startProducingAsync(); // NOTE: To properly support mode when there's no consumer, we have to fall back // to producing future as the trigger for us to shut down the queue return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) .whenComplete((ignored, throwable) -> { // Close the queue to release the resources queue.close(); }) .thenApply(ignored -> consumer.get().finish()) // Block until producing and consuming both finish .get(); } catch (Exception e) { if (e instanceof InterruptedException) { // In case {@code InterruptedException} was thrown, resetting the interrupted flag // of the thread, we reset it (to true) again to permit subsequent handlers // to be interrupted as well Thread.currentThread().interrupt(); } throw new HoodieException(e); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org