ZihanLi58 commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1306005049
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -44,21 +47,47 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED =
ORC_WRITER_PREFIX + "auto.selfTune.enabled";
+ public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE =
ORC_WRITER_PREFIX + "estimated.recordSize";
+ public static final String ORC_WRITER_AUTO_SELFTUNE_FREQUENCY =
ORC_WRITER_PREFIX + "auto.selfTune.frequency";
Review Comment:
can you make the name more meaningful, i.e. indicate what does 500 mean
here?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,52 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSize));
Review Comment:
Do you plan to persist this value to state store? Seems like we will retry
the running every time when container/job restart. How expansive will it be? If
not, maybe we can leave with current implementation
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -194,10 +295,30 @@ public void commit()
public void write(D record)
throws IOException {
Preconditions.checkState(!closed, "Writer already closed");
- valueWriter.write(record, rowBatch);
+ this.orcFileWriterLock.readLock().lock();
Review Comment:
Did you verify that writer can be called concurrently?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java:
##########
@@ -184,6 +201,7 @@ public void addValue(int rowId, int column, Object data,
ColumnVector output) {
value = (byte[]) data;
}
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+ return 1;
Review Comment:
why 1 here?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -68,29 +97,50 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D>
builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema,
properties);
- this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE,
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ this.selfTuningWriter =
properties.getPropAsBoolean(ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
+ this.batchSize = this.selfTuningWriter ?
properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE) :
DEFAULT_ORC_WRITER_BATCH_SIZE;
this.rowBatchPool = RowBatchPool.instance(properties);
this.enableRowBatchPool =
properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
+ this.selfTuneFrequency =
properties.getPropAsInt(ORC_WRITER_AUTO_SELFTUNE_FREQUENCY,
DEFAULT_ORC_AUTO_SELFTUNE_FREQUENCY);
+ // First tune is at 5
this.rowBatch = enableRowBatchPool ?
rowBatchPool.getRowBatch(typeDescription, batchSize) :
typeDescription.createRowBatch(batchSize);
- log.info("Created ORC writer, batch size: {}, {}: {}",
- batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
- properties.getProp(
- OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
- OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));
-
+ this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch);
// Create file-writer
- Configuration conf = new Configuration();
+ this.writerConfig = new Configuration();
// Populate job Configurations into Conf as well so that configurations
related to ORC writer can be tuned easily.
for (Object key : properties.getProperties().keySet()) {
- conf.set((String) key, properties.getProp((String) key));
+ this.writerConfig.set((String) key, properties.getProp((String) key));
}
-
- OrcFile.WriterOptions options =
OrcFile.writerOptions(properties.getProperties(), conf);
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(properties.getProperties(), this.writerConfig);
options.setSchema(typeDescription);
- // For buffer-writer, flush has to be executed before close so it is
better we maintain the life-cycle of fileWriter
- // instead of delegating it to closer object in FsDataWriter.
- this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
+ // Get the amount of allocated and future space available
+ this.availableMemory = Runtime.getRuntime().maxMemory() -
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
+ log.info("Available memory for ORC writer: {}", this.availableMemory);
+
+ if (this.selfTuningWriter) {
+ if (properties.contains(ORC_WRITER_ESTIMATED_RECORD_SIZE)) {
+ log.info("Using previously stored properties, ORC Estimated Record
size is : {}, estimated bytes converter allocated is : {}",
Review Comment:
move to log after line 127 to reuse estimatedRecordSize and
estimatedBytesAllocatedConverterMemory?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -44,21 +47,47 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED =
ORC_WRITER_PREFIX + "auto.selfTune.enabled";
+ public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE =
ORC_WRITER_PREFIX + "estimated.recordSize";
+ public static final String ORC_WRITER_AUTO_SELFTUNE_FREQUENCY =
ORC_WRITER_PREFIX + "auto.selfTune.frequency";
+ public static final int DEFAULT_ORC_AUTO_SELFTUNE_FREQUENCY = 500;
+ public static final String
ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY = ORC_WRITER_PREFIX +
"estimated.bytes.allocated.converter.memory";
+ private static final int MAX_CONCURRENT_WRITERS_CONTAINER = 3;
Review Comment:
how do we know the value here/ Can we calculate it during runtime?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -140,9 +190,17 @@ public State getFinalState() {
@Override
public void flush()
throws IOException {
- if (rowBatch.size > 0) {
- orcFileWriter.addRowBatch(rowBatch);
- rowBatch.reset();
+ try {
+ this.orcFileWriterLock.readLock().lock();
+ if (rowBatch.size > 0) {
+ if (this.orcFileWriter == null) {
Review Comment:
how can this happen? we do lazy init for file writer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]