[ 
https://issues.apache.org/jira/browse/GOBBLIN-1918?focusedWorklogId=881457&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-881457
 ]

ASF GitHub Bot logged work on GOBBLIN-1918:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Sep/23 19:00
            Start Date: 22/Sep/23 19:00
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3787:
URL: https://github.com/apache/gobblin/pull/3787#discussion_r1334726623


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -28,55 +28,107 @@
 import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
 
 /**
  * A helper class to calculate the size of array buffers in a {@link 
VectorizedRowBatch}.
  * This estimate is mainly based on the maximum size of each variable length 
column, which can be resized
  * Since the resizing algorithm for each column can balloon, this can affect 
likelihood of OOM
  */
+@Slf4j
 public class OrcConverterMemoryManager {
 
+  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+  private static final int DEFAULT_ENLARGE_FACTOR = 3;
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+  // Needs to be greater than 1.0
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+  // Given the defaults it will take around 500 records to reach the min 
enlarge factor - given that the default
+  // batch size is 1000 this is a fairly conservative default
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
   private VectorizedRowBatch rowBatch;
+  private int resizeCount = 0;
+  private double smartArrayEnlargeFactorMax;
+  private double smartArrayEnlargeFactorMin;
+  private double smartArrayEnlargeDecayFactor;
+  private boolean enabledSmartSizing = false;
+  int enlargeFactor = 0;
 
-  // TODO: Consider moving the resize algorithm from the converter to this 
class
-  OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+  OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
     this.rowBatch = rowBatch;
+    this.enabledSmartSizing = 
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+    this.enlargeFactor = 
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY, 
DEFAULT_ENLARGE_FACTOR);
+    this.smartArrayEnlargeFactorMax = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+    this.smartArrayEnlargeFactorMin = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+    this.smartArrayEnlargeDecayFactor = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR, 
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+    if (enabledSmartSizing) {
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+          String.format("Smart array enlarge factor needs to be larger than 
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,

Review Comment:
   If it's equal to one, will that ends up with resize does not work as 
expected and for a specific record, we always are not able to hold it and keep 
retry resizing? 



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -97,4 +149,19 @@ public long getConverterBufferTotalSize() {
     return converterBufferTotalSize;
   }
 
+  /**
+   * Resize the child-array size based on configuration.
+   * If smart resizing is enabled, it will use an exponential decay algorithm 
where it would resize the array by a smaller amount
+   * the more records the converter has processed, as the fluctuation in 
record size becomes less likely to differ significantly by then
+   * Since the writer is closed and reset periodically, this is generally a 
safe assumption that should prevent large empty array buffers
+   */
+  public int resize(int rowsAdded, int requestedSize) {
+    resizeCount += 1;
+    log.info(String.format("It has been resized %s times in current writer", 
resizeCount));
+    if (enabledSmartSizing) {
+      double decayingEnlargeFactor =  this.smartArrayEnlargeFactorMax * 
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);

Review Comment:
   why do we use rowsAdded instead of resizeCount here?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 881457)
    Time Spent: 0.5h  (was: 20m)

> Optimize smart resizing for ORC Writer converter buffer
> -------------------------------------------------------
>
>                 Key: GOBBLIN-1918
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1918
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The GobblinOrcWriter contains a converter and a buffer rowbatch. The buffer 
> holds the converted Avro -> Orc records before adding them to the native orc 
> writer.
> Since it can contain multiple records, it constantly needs to resize the 
> columns of the rowbatch in order to hold multiple records. This problem 
> affects both performance and memory when resizing is done either too often 
> (enlarge factor is too low) or not often enough (enlarge factor is too high 
> and thus the buffer dominates the container memory).
> Because there is a bounded number of records that can persist in the buffer 
> before getting flushed, we want to reduce the aggressiveness of the resizing 
> algorithm the more records that have been processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to