This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5db44e2 SAMZA-2778: Make AzureBlobOutputStream buffer initialization 
size configurable. (#1662)
aa5db44e2 is described below

commit aa5db44e25e87e84d7fc32dc50ba51347290acd8
Author: Eric Honer <[email protected]>
AuthorDate: Fri Jun 16 19:20:58 2023 -0700

    SAMZA-2778: Make AzureBlobOutputStream buffer initialization size 
configurable. (#1662)
    
    Symptom:
    JVM crashes due to OOM exceptions. When the system has a large number of 
AzureBlobWriterObjects, the memory becomes heavily fragmented and can be 
susceptible to crashing when a new AzureBlobOutputStream is created.
    
    Cause:
    The crashes are caused by inefficient memory management when using the 
G1GC. (The default garbage collector in Java 11+.) What causes (code paths) 
this issue and why this is a problem for the G1GC are examined separately.
    
    What causes this issue?
    The underlying issue is caused by the AzureBlobOutputStream. When a new 
instance is created, it creates a new ByteArrayOutputStream initialized to 
maxFlushThresholdSize.12 The ByteArrayOutputStream is the buffer used by the 
parent to accumulate messages between flush intervals. It requires 10MB 
(current default) of memory to initialize. This allows the buffer to accumulate 
messages without resize operations, however, it does not prevent resizing. The 
maxFlushThresholdSize is enforced  [...]
    
    Why this is a problem for the G1GC?
    The focus here is on the G1GC and humongous objects (G1 specific).3 The G1 
GC introduced a new memory management strategy that divides the heap into 
regions, -XX:G1HeapRegionSize=n. The GC can operate on regions concurrently and 
copies live objects between regions during full GC to reclaim/empty regions.4 
The default behavior creates ~2048 regions, sized to a factor of 2 between 1MB 
and 32MB. Any object larger than half of a region size, is considered a 
humongous object.
    Humongous objects are allocated an entire region (or consecutive regions if 
larger than a single region) and are not copied between regions during full GC. 
Any space not used by the object within a region is non-addressable for the 
life of the object.5 A JVM heap size of 31GB, -Xmx31G, will default to 16MB 
regions. Considering the current default size is 10MB, each buffer requires an 
entire region and prevents the use of 6MB, regardless of the how much data is 
in the buffer. For a hea [...]
    The 10MB buffer size can exhaust the regions and cause OOMs or create 
fragmentation that causes OOMs. A fragmentation caused OOM occurs in the 
following sequence. On new, the JVM attempts to create the object in Eden. If 
there is insufficient space in Eden a minor GC is performed. If there is 
insufficient space after minor GC, the object is immediately promoted Old Gen. 
If there is insufficient space in Old Gen, a full GC is performed. If a full GC 
cannot allocate memory or region(s)  [...]
    
    Changes:
    The javadocs, where appropriate, have been updated to reflect changes or 
describe new behaviors. No public APIs were removed, they were marked 
deprecated and migrated to the new default initialization value. All of the 
changes are itemized below.
    
    AzureBlobConfig
    Adding two new public fields and one public method. The new configuration 
is made accessible in the same manner as existing configs (see #configs 
SEP-26), also consistent the coding-guide. There is a new public config key: 
SYSTEM_INIT_BUFFER_SIZE - named initBufferSize.bytes. The default value is 
public field SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The user provided configuration 
value is accessible with new public method getInitBufferSizeBytes(..). The 
method returns the configuration value [...]
    
    AzureBlobWriterFactory
    There are two changes to this interface, both to the method, 
getWriterInstance(..). The existing implementation is marked @Deprecated and a 
new method with an additional parameter is added. The new parameter is an int 
that is expected to be the _ initBufferSize_.
    
    AzureBlobAvroWriterFactory
    The modifications here are consistent with the changes to interface 
AzureBlobWriterFactory. However, the deprecated implementation uses the new 
field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT when calling the new 
public API. This will migrate users to the new initialization behavior.
    
    AzureBlobAvroWriter
    There are two public, one package-private, and two private changes. Both 
public changes are to constructors. The existing public constructor is marked 
deprecated and invokes the new public constructor with the new field 
AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new constructor sets the 
new private field initBufferSize. The package-private constructor is modified 
with an additional int parameter and the tests were changed accordingly. The 
remaining private change modifies cr [...]
    
    AzureBlobOutputStream
    The existing public API is marked @Deprecated and the ByteArrayOutputStream 
is initialized with the new field 
AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new public constructor 
includes the new parameter initBufferSize and initializes the 
ByteArrayOutputStream to that size.
---
 .../versioned/jobs/samza-configurations.md         |  1 +
 .../samza/system/azureblob/AzureBlobConfig.java    | 12 +++++++++++
 .../system/azureblob/avro/AzureBlobAvroWriter.java | 22 ++++++++++++++++++--
 .../azureblob/avro/AzureBlobAvroWriterFactory.java |  5 +++--
 .../azureblob/avro/AzureBlobOutputStream.java      |  9 ++++----
 .../producer/AzureBlobSystemProducer.java          |  3 ++-
 .../azureblob/producer/AzureBlobWriterFactory.java |  3 ++-
 .../azureblob/avro/TestAzureBlobAvroWriter.java    | 24 ++++++++++++----------
 .../azureblob/avro/TestAzureBlobOutputStream.java  |  4 +++-
 9 files changed, 61 insertions(+), 22 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 790e289ec..60d8ae8b5 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -267,6 +267,7 @@ Configs for producing to [Azure Blob 
Storage](https://azure.microsoft.com/en-us/
 |systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 
MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size 
allowed by Azure is 100MB.|
 |systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE 
(unlimited)|max size of the uncompressed blob in bytes.<br>If default value 
then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB 
per block X 50,000 blocks).|
 |systems.**_system-name_**.azureblob.maxMessagesPerBlob|Long.MAX_VALUE 
(unlimited)|max number of messages per blob.|
+|systems.**_system-name_**.azureblob.initBufferSize.bytes|32|The amount of 
memory pre-allocated for 
`org.apache.samza.system.azureblob.avro.AzureBlobOutputStream` 
buffers.<br>Values must be between 32 (default) and `maxBlobSize`.<br>This 
should be increased for fast-filling buffers when buffer resize operations 
affect performance. Large values can lead to inefficent memory allocation with 
the G1 garbage collector. If the size is >= half of a region size, 
`G1HeapRegionSize`, consider swit [...]
 |systems.**_system-name_**.azureblob.threadPoolCount|2|number of threads for 
the asynchronous uploading of blocks.|
 |systems.**_system-name_**.azureblob.blockingQueueSize|Thread Pool Count * 
2|size of the queue to hold blocks ready to be uploaded by asynchronous 
threads.<br>If all threads are busy uploading then blocks are queued and if 
queue is full then main thread will start uploading which will block processing 
of incoming messages.|
 |systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to 
finish uploading all blocks before committing a blob.|
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 3026a89af..37c6ebcf3 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
   public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = 
SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
   private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;
 
+  // initialization size of in-memory OutputStream
+  // This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and 
getMaxFlushThresholdSize() exclusive.
+  public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX 
+ "initBufferSize.bytes";
+  // re-use size for parameterless constructor java.io.ByteArrayOutputStream()
+  public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;
+
   // maximum size of uncompressed blob in bytes
   public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + 
"maxBlobSize";
   private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // 
unlimited
@@ -170,6 +176,12 @@ public class AzureBlobConfig extends MapConfig {
     return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), 
SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
   }
 
+  // return larger of config value or DEFAULT and smaller of 
MaxFlushThresholdSize
+  public int getInitBufferSizeBytes(String systemName) {
+    int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), 
SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
+    return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), 
getMaxFlushThresholdSize(systemName));
+  }
+
   public int getAzureBlobThreadPoolCount(String systemName) {
     return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), 
SYSTEM_THREAD_POOL_COUNT_DEFAULT);
   }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index b2e8c3ebf..e50193669 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -23,6 +23,7 @@ import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
@@ -114,6 +115,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
   private final String blobURLPrefix;
   private final long maxBlobSize;
   private final long maxRecordsPerBlob;
+  private final int initBufferSize;
   private final boolean useRandomStringInBlobName;
   private final Object currentDataFileWriterLock = new Object();
   private volatile long recordsInCurrentBlob = 0;
@@ -121,12 +123,24 @@ public class AzureBlobAvroWriter implements 
AzureBlobWriter {
   private Config blobMetadataGeneratorConfig;
   private String streamName;
 
+  @Deprecated
   public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, 
String blobURLPrefix,
       Executor blobThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression 
compression, boolean useRandomStringInBlobName,
       long maxBlobSize, long maxRecordsPerBlob) {
 
+    this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, 
blobMetadataGeneratorFactory,
+        blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, 
flushTimeoutMs, compression,
+        useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, 
AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
+  }
+
+  public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, 
String blobURLPrefix,
+      Executor blobThreadPool, AzureBlobWriterMetrics metrics,
+      BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
+      int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression 
compression, boolean useRandomStringInBlobName,
+      long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {
+
     this.blobThreadPool = blobThreadPool;
     this.metrics = metrics;
     this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
@@ -140,6 +154,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
     this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
     this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
     this.streamName = streamName;
+    this.initBufferSize = initBufferSize;
   }
 
   /**
@@ -244,7 +259,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
       DataFileWriter<Object> dataFileWriter,
       AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient 
blockBlobAsyncClient,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
-      long maxBlobSize, long maxRecordsPerBlob, Compression compression, 
boolean useRandomStringInBlobName) {
+      long maxBlobSize, long maxRecordsPerBlob, Compression compression, 
boolean useRandomStringInBlobName,
+      int initBufferSize) {
+
     if (dataFileWriter == null || azureBlobOutputStream == null || 
blockBlobAsyncClient == null) {
       this.currentBlobWriterComponents = null;
     } else {
@@ -265,6 +282,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
     this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
     this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
     this.streamName = streamName;
+    this.initBufferSize = initBufferSize;
   }
 
   @VisibleForTesting
@@ -351,7 +369,7 @@ public class AzureBlobAvroWriter implements AzureBlobWriter 
{
     try {
       azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, 
blobThreadPool, metrics,
           blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
-          streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
+          streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, 
initBufferSize);
     } catch (Exception e) {
       throw new SamzaException("Unable to create AzureBlobOutputStream", e);
     }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
index 0a4e01908..6434145ad 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriterFactory.java
@@ -35,13 +35,14 @@ public class AzureBlobAvroWriterFactory implements 
AzureBlobWriterFactory {
   /**
    * {@inheritDoc}
    */
+  @Override
   public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient 
containerAsyncClient, String blobURL,
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression 
compression, boolean useRandomStringInBlobName,
-      long maxBlobSize, long maxMessagesPerBlob) throws IOException {
+      long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws 
IOException {
     return new AzureBlobAvroWriter(containerAsyncClient, blobURL, 
blobUploadThreadPool, metrics,
           blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, 
streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
-          compression, useRandomStringInBlobName, maxBlobSize, 
maxMessagesPerBlob);
+          compression, useRandomStringInBlobName, maxBlobSize, 
maxMessagesPerBlob, initBufferSize);
   }
 }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 0e05aefa6..51d983cfd 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -51,7 +51,8 @@ import reactor.core.scheduler.Schedulers;
 
 /**
  * This class extends {@link java.io.OutputStream} and uses {@link 
java.io.ByteArrayOutputStream}
- * for caching the write calls till upload is not called.
+ * for caching the write calls till upload is not called. The initialization 
size of the
+ * underlying {@link java.io.ByteArrayOutputStream} can be set by the caller 
or from config.
  *
  * It asynchronously uploads the blocks and waits on them to finish at close.
  * The blob is persisted at close.
@@ -105,13 +106,13 @@ public class AzureBlobOutputStream extends OutputStream {
    * @param flushTimeoutMs timeout for uploading a block
    * @param maxBlockFlushThresholdSize max block size
    * @param compression type of compression to be used before uploading a block
+   * @param initBufferSize initial size of {@link ByteArrayOutputStream}
    */
   public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor 
blobThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
-      long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression 
compression) {
+      long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression 
compression, int initBufferSize) {
     this(blobAsyncClient, blobThreadPool, metrics, 
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
-        flushTimeoutMs, maxBlockFlushThresholdSize,
-        new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
+        flushTimeoutMs, maxBlockFlushThresholdSize, new 
ByteArrayOutputStream(initBufferSize), compression);
   }
 
   /**
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 91927d272..2774dba41 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -499,7 +499,8 @@ public class AzureBlobSystemProducer implements 
SystemProducer {
           
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
           config.getSuffixRandomStringToBlobName(systemName),
           config.getMaxBlobSize(systemName),
-          config.getMaxMessagesPerBlob(systemName));
+          config.getMaxMessagesPerBlob(systemName),
+          config.getInitBufferSizeBytes(systemName));
     } catch (Exception e) {
       throw new RuntimeException("Failed to create a writer for the 
producer.", e);
     }
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
index e9fa16d2a..a3d7a1c33 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobWriterFactory.java
@@ -38,6 +38,7 @@ public interface AzureBlobWriterFactory {
    * @param streamName name of the stream that this AzureBlobWriter is 
associated with
    * @param maxBlockFlushThresholdSize threshold at which to upload
    * @param flushTimeoutMs timeout after which the flush is abandoned
+   * @param initBufferSize initial size of in-memory buffer(s)
    * @return AzureBlobWriter instance
    * @throws IOException if writer creation fails
    */
@@ -45,5 +46,5 @@ public interface AzureBlobWriterFactory {
       Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
       BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config 
blobMetadataGeneratorConfig, String streamName,
       int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression 
compression, boolean useRandomStringInBlobName,
-      long maxBlobSize, long maxMessagesPerBlob) throws IOException;
+      long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws 
IOException;
 }
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 3f7cefc99..8cca792a8 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -22,6 +22,7 @@ package org.apache.samza.system.azureblob.avro;
 import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.compression.CompressionFactory;
 import org.apache.samza.system.azureblob.compression.CompressionType;
@@ -89,6 +90,7 @@ public class TestAzureBlobAvroWriter {
   private static final String VALUE = "FAKE_VALUE";
   private static final String SYSTEM_NAME = "FAKE_SYSTEM";
   private static final int THRESHOLD = 100;
+  private static final int INIT_SIZE = 
AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;
 
   private class SpecificRecordEvent extends 
org.apache.avro.specific.SpecificRecordBase
       implements org.apache.avro.specific.SpecificRecord {
@@ -181,7 +183,7 @@ public class TestAzureBlobAvroWriter {
         spy(new AzureBlobAvroWriter(mockContainerAsyncClient, 
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
             60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, 
mockBlockBlobAsyncClient,
             blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, 
STREAM_NAME,
-            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // 
keeping blob size and number of records unlimited
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, 
INIT_SIZE)); // keeping blob size and number of records unlimited
     
doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) 
ome.getMessage());
   }
   @Test
@@ -236,7 +238,7 @@ public class TestAzureBlobAvroWriter {
         spy(new 
AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), 
mock(AzureBlobWriterMetrics.class),
             threadPool, THRESHOLD, 60000, "test",
             null, null, null, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-            1000, 100, mockCompression, false));
+            1000, 100, mockCompression, false, INIT_SIZE));
     OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new 
SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]);
     azureBlobAvroWriter.write(omeEncoded);
   }
@@ -289,7 +291,7 @@ public class TestAzureBlobAvroWriter {
         spy(new 
AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), 
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
             60000, "test", null, null, null,
             blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, 
STREAM_NAME,
-            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // 
keeping blob size and number of records unlimited
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, 
INIT_SIZE)); // keeping blob size and number of records unlimited
     when(azureBlobAvroWriter.encodeRecord((IndexedRecord) 
ome.getMessage())).thenThrow(IllegalStateException.class);
     azureBlobAvroWriter.flush(); // No NPE because has null check for 
currentBlobWriterComponents
   }
@@ -304,7 +306,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
         null, null, null, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, 10, mockCompression, true));
+        maxBlobSize, 10, mockCompression, true, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter1 = (DataFileWriter<Object>) 
mock(DataFileWriter.class);
     
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -317,7 +319,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream1 = 
mock(AzureBlobOutputStream.class);
     
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1,
 threadPool,
         mockMetrics, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, 
mockCompression).thenReturn(mockAzureBlobOutputStream1);
+        (long) 60000, THRESHOLD, mockCompression, 
INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 
1);
 
     // first OME creates the first blob
@@ -335,7 +337,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream2 = 
mock(AzureBlobOutputStream.class);
     
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2,
 threadPool,
         mockMetrics, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, 
mockCompression).thenReturn(mockAzureBlobOutputStream2);
+        (long) 60000, THRESHOLD, mockCompression, 
INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 
1);
 
     // Second OME creates the second blob because maxBlobSize is 1000 and 
mockAzureBlobOutputStream.getSize is 999.
@@ -367,7 +369,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
         null, null, null, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, maxRecordsPerBlob, mockCompression, true));
+        maxBlobSize, maxRecordsPerBlob, mockCompression, true, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter1 = (DataFileWriter<Object>) 
mock(DataFileWriter.class);
     
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
@@ -380,7 +382,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream1 = 
mock(AzureBlobOutputStream.class);
     
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1,
 threadPool,
         mockMetrics, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, 
mockCompression).thenReturn(mockAzureBlobOutputStream1);
+        (long) 60000, THRESHOLD, mockCompression, 
INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
     when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1);
 
     // first OME creates the first blob and 11th OME (ome2) creates the second 
blob.
@@ -401,7 +403,7 @@ public class TestAzureBlobAvroWriter {
     AzureBlobOutputStream mockAzureBlobOutputStream2 = 
mock(AzureBlobOutputStream.class);
     
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2,
 threadPool,
         mockMetrics, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        (long) 60000, THRESHOLD, 
mockCompression).thenReturn(mockAzureBlobOutputStream2);
+        (long) 60000, THRESHOLD, mockCompression, 
INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
     when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1);
 
     azureBlobAvroWriter.write(ome2);
@@ -430,7 +432,7 @@ public class TestAzureBlobAvroWriter {
     azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, 
blobUrlPrefix,
         mockDataFileWriter, mockAzureBlobOutputStream, 
mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, 
blobMetadataGeneratorConfig, STREAM_NAME,
-        maxBlobSize, maxRecordsPerBlob, mockCompression, false));
+        maxBlobSize, maxRecordsPerBlob, mockCompression, false, INIT_SIZE));
 
     DataFileWriter<Object> mockDataFileWriter2 = mock(DataFileWriter.class);
     AzureBlobOutputStream mockAzureBlobOutputStream2 = 
mock(AzureBlobOutputStream.class);
@@ -457,7 +459,7 @@ public class TestAzureBlobAvroWriter {
         mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
         60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, 
mockBlockBlobAsyncClient,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
-        Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false));
+        Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE));
     IndexedRecord record = new GenericRecordEvent();
     Assert.assertTrue(Arrays.equals(encodeRecord(record), 
azureBlobAvroWriter.encodeRecord(record)));
   }
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index e4f3cc56a..19a802a20 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import org.apache.samza.AzureException;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
@@ -75,6 +76,7 @@ public class TestAzureBlobOutputStream {
   private ThreadPoolExecutor threadPool;
   private ByteArrayOutputStream mockByteArrayOutputStream;
   private static final int THRESHOLD = 100;
+  private static final int INIT_SIZE = 
AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;
   private BlockBlobAsyncClient mockBlobAsyncClient;
   private AzureBlobOutputStream azureBlobOutputStream;
   private static final String RANDOM_STRING = 
"roZzozzLiR7GCEjcB0UsRUNgBAip8cSLGXQSo3RQvbIDoxOaaRs4hrec2s5rMPWgTPRY4UnE959worEtyhRjwUFnRnVuNFZ554yuPQCbI69qFkQX7MmrB4blmpSnFeGjWKjFjIRLFNVSsQBYMkr5jT4T83uVtuGumsjACVrpcilihdd194H8Y71rQcrXZoTQtw5OvmPicbwptawpHoRNzHihyaDVYgAs0dQbvVEu1gitKpamzYdMLFtc5h8PFZSVEB";
@@ -98,7 +100,7 @@ public class TestAzureBlobOutputStream {
         TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
 
 
-    mockByteArrayOutputStream = spy(new ByteArrayOutputStream(THRESHOLD));
+    mockByteArrayOutputStream = spy(new ByteArrayOutputStream(INIT_SIZE));
 
     mockBlobAsyncClient = PowerMockito.mock(BlockBlobAsyncClient.class);
 

Reply via email to