Will-Lo commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1310539874
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+ log.info("Tuning ORC writer batch size from {} to {} based on average
byte size per record: {} with available memory {} and {} bytes of allocated
memory in converter buffers, with {} partitioned writers",
+ batchSize, newBatchSize, averageSizePerRecord, availableMemory,
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+ this.batchSize = newBatchSize;
+ // We only initialize the native ORC file writer once to avoid creating
too many small files, as reconfiguring rows between memory check
+ // requires one to close the writer and start a new file
+ if (this.orcFileWriter == null) {
+ initializeOrcFileWriter();
+ }
+ this.flush();
+ this.rowBatch.ensureSize(this.batchSize);
+ }
+ if (this.orcFileWriter == null) {
Review Comment:
I think to handle every edge case (in scenarios where low vol writers get
closed before first tune) will always lazy init instead inside of the flush
function, this way won't need to dupe the logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]