This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b923af1  [Functions] Support KEY_BASED batch builder for Java based 
functions and sources (#11706)
b923af1 is described below

commit b923af16f629bf298ee2e8fec44864a2c8a2615b
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:33:50 2021 +0300

    [Functions] Support KEY_BASED batch builder for Java based functions and 
sources (#11706)
    
    * [Functions] Support KEY_BASED batch builder for Java based functions and 
sources
    
    * Include batchBuilder in ProducerSpec -> 
ProducerConfig.ProducerConfigBuilder conversion
    
    * Support setting batch builder for sources with "--batch-builder 
KEY_BASED" argument
---
 .../pulsar/functions/instance/ContextImpl.java     | 21 ++++++++++-----
 .../functions/instance/JavaInstanceRunnable.java   |  1 +
 .../pulsar/functions/utils/SourceConfigUtils.java  |  7 +++++
 .../functions/utils/SourceConfigUtilsTest.java     | 31 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 957cc44..a57d53b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
 import lombok.ToString;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
@@ -146,14 +147,22 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
         this.producerBuilder = (ProducerBuilderImpl<?>) 
client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         boolean useThreadLocalProducers = false;
-        if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
-            if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()
 != 0) {
-                
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+        Function.ProducerSpec producerSpec = 
config.getFunctionDetails().getSink().getProducerSpec();
+        if (producerSpec != null) {
+            if (producerSpec.getMaxPendingMessages() != 0) {
+                
this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
             }
-            if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
 != 0) {
-                
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+                
this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
             }
-            useThreadLocalProducers = 
config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+            if (producerSpec.getBatchBuilder() != null) {
+                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+                    
this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                } else {
+                    
this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+                }
+            }
+            useThreadLocalProducers = 
producerSpec.getUseThreadLocalProducers();
         }
         if (useThreadLocalProducers) {
             tlPublishProducers = new ThreadLocal<>();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 705b920..614b7a4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -782,6 +782,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                     ProducerConfig.ProducerConfigBuilder builder = 
ProducerConfig.builder()
                             .maxPendingMessages(conf.getMaxPendingMessages())
                             
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+                            .batchBuilder(conf.getBatchBuilder())
                             
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
                             
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                     pulsarSinkConfig.setProducerConfig(builder.build());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9049eb6..6450d6e 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -167,6 +167,13 @@ public class SourceConfigUtils {
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
+        if (sourceConfig.getBatchBuilder() != null) {
+            Function.ProducerSpec.Builder builder = 
sinkSpecBuilder.getProducerSpec() != null
+                    ? sinkSpecBuilder.getProducerSpec().toBuilder()
+                    : Function.ProducerSpec.newBuilder();
+            
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+        }
+
         sinkSpecBuilder.setForwardSourceMessageProperty(true);
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 20a64f8..22b5afa 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         assertTrue(e.getMessage().contains("Could not validate source config: 
Field 'configParameter' cannot be null!"));
     }
 
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setProducerConfig(null);
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+    }
+
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigExists() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+        
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(),
 123456);
+    }
+
+    @Test
+    public void 
testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined()
 {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder(null);
+        sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+    }
+
     private SourceConfig createSourceConfigWithBatch() {
         SourceConfig sourceConfig = createSourceConfig();
         BatchSourceConfig batchSourceConfig = createBatchSourceConfig();

Reply via email to