sv2000 commented on a change in pull request #3128:
URL: https://github.com/apache/incubator-gobblin/pull/3128#discussion_r510266907



##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -59,6 +63,18 @@
   static final String ORC_WRITER_PREFIX = "orcWriter.";
   private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
   private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+  @VisibleForTesting
+  static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + 
"autoTuneEnabled";
+  private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+  private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+  private static final int PARALLELISM_WRITERS = 3;

Review comment:
       MAX_WRITER_PARALLELISM or ESTIMATED_WRITER_PARALLELISM? Also add a 
comment explaining what this config is?

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
   private final int batchSize;
   private final Schema avroSchema;
 
+  /**
+   * There are couple of parameters in ORC writer that requires manual tuning 
based on record size given that executor
+   * for running these ORC writers has limited heap space. This helper 
function wrap them and has side effect for the
+   * argument {@param properties}.
+   *
+   * Assumption for current implementation:

Review comment:
       It would be good to not make any assumptions about the mode of operation 
inside the GobblinOrcWriter. I see the auto-tuning capability to be useful in 
other modes of operation too: such as MR, Gobblin-on-K8s, standalone etc. 

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -59,6 +63,18 @@
   static final String ORC_WRITER_PREFIX = "orcWriter.";
   private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
   private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+  @VisibleForTesting
+  static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + 
"autoTuneEnabled";
+  private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+  private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+  private static final int PARALLELISM_WRITERS = 3;
+
+  // The serialized record size passed from AVG_RECORD_SIZE is smaller than 
the actual in-memory representation
+  // of a record. This is just the number represents how many times that the 
actual buffer storing record is larger
+  // than the serialized size passed down from upstream constructs.
+  @VisibleForTesting
+  static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";

Review comment:
       Will this config be schema-specific? Do we expect to see differences 
between wide rows vs narrow rows? Or is a single scale factor good enough to be 
widely applicable? 

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
   private final int batchSize;
   private final Schema avroSchema;
 
+  /**
+   * There are couple of parameters in ORC writer that requires manual tuning 
based on record size given that executor
+   * for running these ORC writers has limited heap space. This helper 
function wrap them and has side effect for the
+   * argument {@param properties}.
+   *
+   * Assumption for current implementation:
+   * - Running in Gobblin-on-YARN mode to enable this feature as all the 
memory settings here are
+   * relevant to the resources requested from YARN.
+   * - Record size is provided by AVG_RECORD_SIZE which is a kafka related 
attribute. For other sources, upstream

Review comment:
       Rather than assume that it is a Kafka related attribute, we could have 
source/extractor set the AVG_RECORD_SIZE for the writer. I guess that is what 
you are saying in the second part of the comment, but it would be less 
confusing if we avoid mentioning that this is a Kafka-related attribute. 

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
   private final int batchSize;
   private final Schema avroSchema;
 
+  /**
+   * There are couple of parameters in ORC writer that requires manual tuning 
based on record size given that executor
+   * for running these ORC writers has limited heap space. This helper 
function wrap them and has side effect for the
+   * argument {@param properties}.
+   *
+   * Assumption for current implementation:
+   * - Running in Gobblin-on-YARN mode to enable this feature as all the 
memory settings here are
+   * relevant to the resources requested from YARN.
+   * - Record size is provided by AVG_RECORD_SIZE which is a kafka related 
attribute. For other sources, upstream
+   * constructs should provide its representation of record size.
+   *
+   * One should overwrite the behavior if plugging into systems where 
assumptions above don't hold.
+   */
+  protected void autoTunedOrcWriterParams(State properties) {
+    double writerRatio = 
properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double) 
OrcConf.MEMORY_POOL.getDefaultValue());
+    long availableHeapPerWriter = Math.round(availableHeapSize(properties) * 
writerRatio / PARALLELISM_WRITERS);
+
+    // Upstream constructs will need to set this value properly
+    long estimatedRecordSize = getEstimatedRecordSize(properties);
+    long rowsBetweenCheck = availableHeapPerWriter * 1024 / 
estimatedRecordSize;
+    properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+        Math.min(rowsBetweenCheck, (int) 
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
+    // Row batch size should be smaller than row_between_check, 4 is just a 
magic number picked here.
+    long batchSize = Math.min(rowsBetweenCheck / 4, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
+    log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " 
to be:" + rowsBetweenCheck + ","
+        + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
+  }
+
+  /**
+   * Calculate the heap size in MB available for ORC writers.
+   */
+  protected long availableHeapSize(State Properties) {
+    // Calculate the recommended size as the threshold for memory check
+    long physicalMem = 
Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS_KEY, 4096)

Review comment:
       We should avoid referencing Yarn specific configurations directly here. 
One possible solution would be to add indirection here by introducing OrcWriter 
specific memory configs e.g. gobblin.orc.writer.jvm.memory.mbs. This config can 
then be set to the Yarn memory mbs config value in the pull file. Other modes 
can set the writer memory configs accordingly. This introduces additional 
configs, but we avoid making assumptions about the execution mode in the 
writer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to