zhangyue19921010 commented on PR #7582:
URL: https://github.com/apache/hudi/pull/7582#issuecomment-1367801102

   For example something like this
   ```
   public class DisruptorExecutor<I, O, E> extends 
BaseHoodieQueueBasedExecutor<I, O, E> {
   
     public DisruptorExecutor(final Option<Integer> bufferSize, final 
Iterator<I> inputItr,
                              HoodieConsumer<O, E> consumer, Function<I, O> 
transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) {
       this(bufferSize, Collections.singletonList(new 
IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer),
           transformFunction, waitStrategy, preExecuteRunnable);
     }
   
     public DisruptorExecutor(final Option<Integer> bufferSize, 
List<HoodieProducer<I>> producers,
                              Option<HoodieConsumer<O, E>> consumer, final 
Function<I, O> transformFunction,
                              final Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
       super(producers, consumer, new DisruptorMessageQueue<>(bufferSize, 
transformFunction, waitStrategy, producers.size(), preExecuteRunnable), 
preExecuteRunnable);
     }
   
     @Override
     protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, 
E> consumer) {
       // no-op
       // will do consuming actions in disruptor
     }
   
     @Override
     public void setup() {
       DisruptorMessageQueue<I, O> disruptorQueue = (DisruptorMessageQueue<I, 
O>) queue;
       // Before we start producing, we need to set up Disruptor's queue
       disruptorQueue.setHandlers(consumer.get());
       disruptorQueue.start();
     }
   }
   ```
   
   and 
   
   ```
   
     public void setup(){}
   
     /**
      * Main API to run both production and consumption.
      */
     @Override
     public E execute() {
       try {
         checkState(this.consumer.isPresent());
         setup();
         // Start consuming/producing asynchronously
         CompletableFuture<Void> consuming = startConsumingAsync();
         CompletableFuture<Void> producing = startProducingAsync();
   
         // NOTE: To properly support mode when there's no consumer, we have to 
fall back
         //       to producing future as the trigger for us to shut down the 
queue
         return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null)
             .whenComplete((ignored, throwable) -> {
               // Close the queue to release the resources
               queue.close();
             })
             .thenApply(ignored -> consumer.get().finish())
             // Block until producing and consuming both finish
             .get();
       } catch (Exception e) {
         if (e instanceof InterruptedException) {
           // In case {@code InterruptedException} was thrown, resetting the 
interrupted flag
           // of the thread, we reset it (to true) again to permit subsequent 
handlers
           // to be interrupted as well
           Thread.currentThread().interrupt();
         }
   
         throw new HoodieException(e);
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to