[
https://issues.apache.org/jira/browse/GOBBLIN-1290?focusedWorklogId=503784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-503784
]
ASF GitHub Bot logged work on GOBBLIN-1290:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/20 16:02
Start Date: 22/Oct/20 16:02
Worklog Time Spent: 10m
Work Description: sv2000 commented on a change in pull request #3128:
URL: https://github.com/apache/incubator-gobblin/pull/3128#discussion_r510266907
##########
File path:
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -59,6 +63,18 @@
static final String ORC_WRITER_PREFIX = "orcWriter.";
private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ @VisibleForTesting
+ static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX +
"autoTuneEnabled";
+ private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+ private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+ private static final int PARALLELISM_WRITERS = 3;
Review comment:
MAX_WRITER_PARALLELISM or ESTIMATED_WRITER_PARALLELISM? Also add a
comment explaining what this config is?
##########
File path:
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
private final int batchSize;
private final Schema avroSchema;
+ /**
+ * There are couple of parameters in ORC writer that requires manual tuning
based on record size given that executor
+ * for running these ORC writers has limited heap space. This helper
function wrap them and has side effect for the
+ * argument {@param properties}.
+ *
+ * Assumption for current implementation:
Review comment:
It would be good to not make any assumptions about the mode of operation
inside the GobblinOrcWriter. I see the auto-tuning capability to be useful in
other modes of operation too: such as MR, Gobblin-on-K8s, standalone etc.
##########
File path:
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -59,6 +63,18 @@
static final String ORC_WRITER_PREFIX = "orcWriter.";
private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ @VisibleForTesting
+ static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX +
"autoTuneEnabled";
+ private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+ private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+ private static final int PARALLELISM_WRITERS = 3;
+
+ // The serialized record size passed from AVG_RECORD_SIZE is smaller than
the actual in-memory representation
+ // of a record. This is just the number represents how many times that the
actual buffer storing record is larger
+ // than the serialized size passed down from upstream constructs.
+ @VisibleForTesting
+ static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
Review comment:
Will this config be schema-specific? Do we expect to see differences
between wide rows vs narrow rows? Or is a single scale factor good enough to be
widely applicable?
##########
File path:
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
private final int batchSize;
private final Schema avroSchema;
+ /**
+ * There are couple of parameters in ORC writer that requires manual tuning
based on record size given that executor
+ * for running these ORC writers has limited heap space. This helper
function wrap them and has side effect for the
+ * argument {@param properties}.
+ *
+ * Assumption for current implementation:
+ * - Running in Gobblin-on-YARN mode to enable this feature as all the
memory settings here are
+ * relevant to the resources requested from YARN.
+ * - Record size is provided by AVG_RECORD_SIZE which is a kafka related
attribute. For other sources, upstream
Review comment:
Rather than assume that it is a Kafka related attribute, we could have
source/extractor set the AVG_RECORD_SIZE for the writer. I guess that is what
you are saying in the second part of the comment, but it would be less
confusing if we avoid mentioning that this is a Kafka-related attribute.
##########
File path:
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -77,10 +93,61 @@
private final int batchSize;
private final Schema avroSchema;
+ /**
+ * There are couple of parameters in ORC writer that requires manual tuning
based on record size given that executor
+ * for running these ORC writers has limited heap space. This helper
function wrap them and has side effect for the
+ * argument {@param properties}.
+ *
+ * Assumption for current implementation:
+ * - Running in Gobblin-on-YARN mode to enable this feature as all the
memory settings here are
+ * relevant to the resources requested from YARN.
+ * - Record size is provided by AVG_RECORD_SIZE which is a kafka related
attribute. For other sources, upstream
+ * constructs should provide its representation of record size.
+ *
+ * One should overwrite the behavior if plugging into systems where
assumptions above don't hold.
+ */
+ protected void autoTunedOrcWriterParams(State properties) {
+ double writerRatio =
properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double)
OrcConf.MEMORY_POOL.getDefaultValue());
+ long availableHeapPerWriter = Math.round(availableHeapSize(properties) *
writerRatio / PARALLELISM_WRITERS);
+
+ // Upstream constructs will need to set this value properly
+ long estimatedRecordSize = getEstimatedRecordSize(properties);
+ long rowsBetweenCheck = availableHeapPerWriter * 1024 /
estimatedRecordSize;
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+ Math.min(rowsBetweenCheck, (int)
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
+ // Row batch size should be smaller than row_between_check, 4 is just a
magic number picked here.
+ long batchSize = Math.min(rowsBetweenCheck / 4,
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
+ log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + "
to be:" + rowsBetweenCheck + ","
+ + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
+ }
+
+ /**
+ * Calculate the heap size in MB available for ORC writers.
+ */
+ protected long availableHeapSize(State Properties) {
+ // Calculate the recommended size as the threshold for memory check
+ long physicalMem =
Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS_KEY, 4096)
Review comment:
We should avoid referencing Yarn specific configurations directly here.
One possible solution would be to add indirection here by introducing OrcWriter
specific memory configs e.g. gobblin.orc.writer.jvm.memory.mbs. This config can
then be set to the Yarn memory mbs config value in the pull file. Other modes
can set the writer memory configs accordingly. This introduces additional
configs, but we avoid making assumptions about the execution mode in the
writer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 503784)
Time Spent: 20m (was: 10m)
> Auto tune ORC writer parameters
> -------------------------------
>
> Key: GOBBLIN-1290
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1290
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Lei Sun
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)