[GitHub] [hudi] nsivabalan commented on a diff in pull request #5416: [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency

2022-11-02 Thread GitBox


nsivabalan commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r1012392921


##
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##
@@ -18,117 +18,97 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
- * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'BoundedInMemoryQueue'. This
  * class takes as input the size limit, queue producer(s), consumer and 
transformer and exposes API to orchestrate
  * concurrent execution of these actors communicating through a central 
bounded queue
  */
-public class BoundedInMemoryExecutor {
+public class BoundedInMemoryExecutor extends HoodieExecutorBase {
 
   private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryExecutor.class);
-  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
-  // Executor service used for launching write thread.
-  private final ExecutorService producerExecutorService;
-  // Executor service used for launching read thread.
-  private final ExecutorService consumerExecutorService;
-  // Used for buffering records which is controlled by 
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
-  private final BoundedInMemoryQueue queue;
-  // Producers
-  private final List> producers;
-  // Consumer
-  private final Option> consumer;
-  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
-  private final Runnable preExecuteRunnable;
+  private final HoodieMessageQueue queue;
 
   public BoundedInMemoryExecutor(final long bufferLimitInBytes, final 
Iterator inputItr,
- BoundedInMemoryQueueConsumer consumer, 
Function transformFunction, Runnable preExecuteRunnable) {
+ IteratorBasedQueueConsumer consumer, 
Function transformFunction, Runnable preExecuteRunnable) {
 this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer producer,
- Option> 
consumer, final Function transformFunction) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer producer,
+ Option> 
consumer, final Function transformFunction) {
 this(bufferLimitInBytes, producer, consumer, transformFunction, 
Functions.noop());
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer producer,
- Option> 
consumer, final Function transformFunction, Runnable preExecuteRunnable) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer producer,
+ Option> 
consumer, final Function transformFunction, Runnable preExecuteRunnable) {
 this(bufferLimitInBytes, Collections.singletonList(producer), consumer, 
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List> producers,
- Option> 
consumer, final Function transformFunction,
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List> producers,
+ Option> 
consumer, final Function transformFunction,
  final SizeEstimator sizeEstimator, 
Runnable preExecuteRunnable) {
-this.producers = producers;
-this.consumer = consumer;
-this.preExecuteRunnable = preExecuteRunnable;
-// Ensure fixed thread for each producer thread
-this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
CustomizedThreadFactory("producer"));
-// Ensure single thread for consumer
-this.consumerExecutorService 

[GitHub] [hudi] nsivabalan commented on a diff in pull request #5416: [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency

2022-10-29 Thread GitBox


nsivabalan commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r1008719537


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -233,6 +244,16 @@ public class HoodieWriteConfig extends HoodieConfig {
   .defaultValue(String.valueOf(4 * 1024 * 1024))
   .withDocumentation("Size of in-memory buffer used for parallelizing 
network reads and lake storage writes.");
 
+  public static final ConfigProperty WRITE_BUFFER_SIZE = 
ConfigProperty
+  .key("hoodie.write.executor.disruptor.buffer.size")
+  .defaultValue(1024)
+  .withDocumentation("The size of the Disruptor Executor ring buffer, must 
be power of 2");
+
+  public static final ConfigProperty WRITE_WAIT_STRATEGY = 
ConfigProperty
+  .key("hoodie.write.executor.disruptor.wait.strategy")
+  .defaultValue("BLOCKING_WAIT")
+  .withDocumentation("Strategy employed for making Disruptor Executor wait 
on a cursor.");

Review Comment:
   can we add other possible values in the documentation.



##
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API 
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor extends HoodieExecutorBase {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorExecutor.class);
+  private final HoodieMessageQueue queue;
+
+  public DisruptorExecutor(final Option bufferSize, final Iterator 
inputItr,
+   IteratorBasedQueueConsumer consumer, 
Function transformFunction, Option waitStrategy, Runnable 
preExecuteRunnable) {
+this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option bufferSize, HoodieProducer 
producer,
+   Option> consumer, 
final Function transformFunction) {
+this(bufferSize, producer, consumer, transformFunction, 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), Functions.noop());
+  }
+
+  public DisruptorExecutor(final Option bufferSize, HoodieProducer 
producer,
+   Option> consumer, 
final Function transformFunction, Option waitStrategy, Runnable 
preExecuteRunnable) {
+this(bufferSize, Collections.singletonList(producer), consumer, 
transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option bufferSize, 
List> producers,
+   Option> consumer, 
final Function transformFunction,
+   final Option waitStrategy, Runnable 
preExecuteRunnable) {
+super(producers, consumer, preExecuteRunnable);
+this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, 
waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  @Override
+  public CompletableFuture startProducers() {
+return CompletableFuture.allOf(producers.stream().map(producer -> {
+  return CompletableFuture.supplyAsync(() -> {
+try {
+  producer.produce(queue);
+} catch (Throwable e) {
+  LOG.error("error producing records", e);
+  throw new HoodieException("Error producing records in disruptor 
executor", e);
+}
+return true;
+  }, 

[GitHub] [hudi] nsivabalan commented on a diff in pull request #5416: [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency

2022-10-01 Thread GitBox


nsivabalan commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r985104403


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -129,6 +131,14 @@ public class HoodieWriteConfig extends HoodieConfig {
   .withDocumentation("Key generator class, that implements 
`org.apache.hudi.keygen.KeyGenerator` "
   + "extract a key out of incoming records.");
 
+  public static final ConfigProperty EXECUTOR_TYPE = ConfigProperty
+  .key("hoodie.write.executor.type")
+  .defaultValue("BOUNDED_IN_MEMORY_EXECUTOR")

Review Comment:
   can we introduce enum for this. also, we can remove "EXECUTOR" suffix from 
it. for eg, in case of Key gen, enum is named as KeyGeneratorType and value is 
just "SIMPLE". we don't call it as "SIMPOLE_KEY_GEN" as its repetitive. 
   



##
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieDisruptorEvent.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+public class HoodieDisruptorEvent {

Review Comment:
   java docs



##
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession}
+
+import scala.util.Random
+
+object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
+
+  protected val spark: SparkSession = getSparkSession
+
+  val recordNumber = 100
+
+  def getSparkSession: SparkSession = SparkSession.builder()
+.master("local[*]")
+.appName(this.getClass.getCanonicalName)
+.withExtensions(new HoodieSparkSessionExtension)
+.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+.config("spark.sql.session.timeZone", "CTT")
+.config(sparkConf())
+.getOrCreate()
+
+  def sparkConf(): SparkConf = {
+val sparkConf = new SparkConf()
+if (HoodieSparkUtils.gteqSpark3_2) {
+  sparkConf.set("spark.sql.catalog.spark_catalog",
+"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+}
+sparkConf
+  }
+
+  private def createDataFrame(number: Int): DataFrame = {
+val schema = new StructType()
+  .add("c1", IntegerType)
+  .add("c2", StringType)
+
+val rdd = spark.sparkContext.parallelize(0 to number, 2).map { item =>
+  val c1 = Integer.valueOf(item)
+  val c2 = s"abc"
+  RowFactory.create(c1, c2)
+}
+spark.createDataFrame(rdd, schema)
+  }
+
+  /**
+   * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
+   * COW Ingestion:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+   * 

+   * BoundInMemory Executor 5557   5607
  70  0.25556.9   1.0X