ZihanLi58 commented on code in PR #3787:
URL: https://github.com/apache/gobblin/pull/3787#discussion_r1334726623
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -28,55 +28,107 @@
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
/**
* A helper class to calculate the size of array buffers in a {@link
VectorizedRowBatch}.
* This estimate is mainly based on the maximum size of each variable length
column, which can be resized
* Since the resizing algorithm for each column can balloon, this can affect
likelihood of OOM
*/
+@Slf4j
public class OrcConverterMemoryManager {
+ private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+ private static final int DEFAULT_ENLARGE_FACTOR = 3;
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+ // Needs to be greater than 1.0
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+ // Given the defaults it will take around 500 records to reach the min
enlarge factor - given that the default
+ // batch size is 1000 this is a fairly conservative default
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
private VectorizedRowBatch rowBatch;
+ private int resizeCount = 0;
+ private double smartArrayEnlargeFactorMax;
+ private double smartArrayEnlargeFactorMin;
+ private double smartArrayEnlargeDecayFactor;
+ private boolean enabledSmartSizing = false;
+ int enlargeFactor = 0;
- // TODO: Consider moving the resize algorithm from the converter to this
class
- OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+ OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
this.rowBatch = rowBatch;
+ this.enabledSmartSizing =
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+ this.enlargeFactor =
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY,
DEFAULT_ENLARGE_FACTOR);
+ this.smartArrayEnlargeFactorMax =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+ this.smartArrayEnlargeFactorMin =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+ this.smartArrayEnlargeDecayFactor =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+ if (enabledSmartSizing) {
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+ String.format("Smart array enlarge factor needs to be larger than
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,
Review Comment:
If it's equal to one, will that ends up with resize does not work as
expected and for a specific record, we always are not able to hold it and keep
retry resizing?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -97,4 +149,19 @@ public long getConverterBufferTotalSize() {
return converterBufferTotalSize;
}
+ /**
+ * Resize the child-array size based on configuration.
+ * If smart resizing is enabled, it will use an exponential decay algorithm
where it would resize the array by a smaller amount
+ * the more records the converter has processed, as the fluctuation in
record size becomes less likely to differ significantly by then
+ * Since the writer is closed and reset periodically, this is generally a
safe assumption that should prevent large empty array buffers
+ */
+ public int resize(int rowsAdded, int requestedSize) {
+ resizeCount += 1;
+ log.info(String.format("It has been resized %s times in current writer",
resizeCount));
+ if (enabledSmartSizing) {
+ double decayingEnlargeFactor = this.smartArrayEnlargeFactorMax *
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);
Review Comment:
why do we use rowsAdded instead of resizeCount here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]