[
https://issues.apache.org/jira/browse/GOBBLIN-1918?focusedWorklogId=881457&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-881457
]
ASF GitHub Bot logged work on GOBBLIN-1918:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Sep/23 19:00
Start Date: 22/Sep/23 19:00
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3787:
URL: https://github.com/apache/gobblin/pull/3787#discussion_r1334726623
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -28,55 +28,107 @@
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
/**
* A helper class to calculate the size of array buffers in a {@link
VectorizedRowBatch}.
* This estimate is mainly based on the maximum size of each variable length
column, which can be resized
* Since the resizing algorithm for each column can balloon, this can affect
likelihood of OOM
*/
+@Slf4j
public class OrcConverterMemoryManager {
+ private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+ private static final int DEFAULT_ENLARGE_FACTOR = 3;
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+ // Needs to be greater than 1.0
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+ // Given the defaults it will take around 500 records to reach the min
enlarge factor - given that the default
+ // batch size is 1000 this is a fairly conservative default
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
private VectorizedRowBatch rowBatch;
+ private int resizeCount = 0;
+ private double smartArrayEnlargeFactorMax;
+ private double smartArrayEnlargeFactorMin;
+ private double smartArrayEnlargeDecayFactor;
+ private boolean enabledSmartSizing = false;
+ int enlargeFactor = 0;
- // TODO: Consider moving the resize algorithm from the converter to this
class
- OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+ OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
this.rowBatch = rowBatch;
+ this.enabledSmartSizing =
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+ this.enlargeFactor =
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY,
DEFAULT_ENLARGE_FACTOR);
+ this.smartArrayEnlargeFactorMax =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+ this.smartArrayEnlargeFactorMin =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+ this.smartArrayEnlargeDecayFactor =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+ if (enabledSmartSizing) {
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+ String.format("Smart array enlarge factor needs to be larger than
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,
Review Comment:
If it's equal to one, will that ends up with resize does not work as
expected and for a specific record, we always are not able to hold it and keep
retry resizing?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -97,4 +149,19 @@ public long getConverterBufferTotalSize() {
return converterBufferTotalSize;
}
+ /**
+ * Resize the child-array size based on configuration.
+ * If smart resizing is enabled, it will use an exponential decay algorithm
where it would resize the array by a smaller amount
+ * the more records the converter has processed, as the fluctuation in
record size becomes less likely to differ significantly by then
+ * Since the writer is closed and reset periodically, this is generally a
safe assumption that should prevent large empty array buffers
+ */
+ public int resize(int rowsAdded, int requestedSize) {
+ resizeCount += 1;
+ log.info(String.format("It has been resized %s times in current writer",
resizeCount));
+ if (enabledSmartSizing) {
+ double decayingEnlargeFactor = this.smartArrayEnlargeFactorMax *
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);
Review Comment:
why do we use rowsAdded instead of resizeCount here?
Issue Time Tracking
-------------------
Worklog Id: (was: 881457)
Time Spent: 0.5h (was: 20m)
> Optimize smart resizing for ORC Writer converter buffer
> -------------------------------------------------------
>
> Key: GOBBLIN-1918
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1918
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> The GobblinOrcWriter contains a converter and a buffer rowbatch. The buffer
> holds the converted Avro -> Orc records before adding them to the native orc
> writer.
> Since it can contain multiple records, it constantly needs to resize the
> columns of the rowbatch in order to hold multiple records. This problem
> affects both performance and memory when resizing is done either too often
> (enlarge factor is too low) or not often enough (enlarge factor is too high
> and thus the buffer dominates the container memory).
> Because there is a bounded number of records that can persist in the buffer
> before getting flushed, we want to reduce the aggressiveness of the resizing
> algorithm the more records that have been processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)