This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b923af1 [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706) b923af1 is described below commit b923af16f629bf298ee2e8fec44864a2c8a2615b Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Aug 25 20:33:50 2021 +0300 [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706) * [Functions] Support KEY_BASED batch builder for Java based functions and sources * Include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion * Support setting batch builder for sources with "--batch-builder KEY_BASED" argument --- .../pulsar/functions/instance/ContextImpl.java | 21 ++++++++++----- .../functions/instance/JavaInstanceRunnable.java | 1 + .../pulsar/functions/utils/SourceConfigUtils.java | 7 +++++ .../functions/utils/SourceConfigUtilsTest.java | 31 ++++++++++++++++++++++ 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 957cc44..a57d53b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -39,6 +39,7 @@ import java.util.stream.Stream; import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; @@ -146,14 +147,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); boolean useThreadLocalProducers = false; - if (config.getFunctionDetails().getSink().getProducerSpec() != null) { - if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) { - this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()); + Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec(); + if (producerSpec != null) { + if (producerSpec.getMaxPendingMessages() != 0) { + this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages()); } - if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { - this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()); + if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) { + this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions()); } - useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers(); + if (producerSpec.getBatchBuilder() != null) { + if (producerSpec.getBatchBuilder().equals("KEY_BASED")) { + this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED); + } else { + this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT); + } + } + useThreadLocalProducers = producerSpec.getUseThreadLocalProducers(); } if (useThreadLocalProducers) { tlPublishProducers = new ThreadLocal<>(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 705b920..614b7a4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -782,6 +782,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder() .maxPendingMessages(conf.getMaxPendingMessages()) .maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()) + .batchBuilder(conf.getBatchBuilder()) .useThreadLocalProducers(conf.getUseThreadLocalProducers()) .cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec())); pulsarSinkConfig.setProducerConfig(builder.build()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 9049eb6..6450d6e 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -167,6 +167,13 @@ public class SourceConfigUtils { sinkSpecBuilder.setProducerSpec(pbldr.build()); } + if (sourceConfig.getBatchBuilder() != null) { + Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null + ? sinkSpecBuilder.getProducerSpec().toBuilder() + : Function.ProducerSpec.newBuilder(); + sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build()); + } + sinkSpecBuilder.setForwardSourceMessageProperty(true); functionDetailsBuilder.setSink(sinkSpecBuilder); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index 20a64f8..22b5afa 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends PowerMockTestCase { assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!")); } + @Test + public void testSupportsBatchBuilderWhenProducerConfigIsNull() { + SourceConfig sourceConfig = createSourceConfig(); + sourceConfig.setProducerConfig(null); + sourceConfig.setBatchBuilder("KEY_BASED"); + Function.FunctionDetails functionDetails = + SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null)); + assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED"); + } + + @Test + public void testSupportsBatchBuilderWhenProducerConfigExists() { + SourceConfig sourceConfig = createSourceConfig(); + sourceConfig.setBatchBuilder("KEY_BASED"); + sourceConfig.getProducerConfig().setMaxPendingMessages(123456); + Function.FunctionDetails functionDetails = + SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null)); + assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED"); + assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456); + } + + @Test + public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() { + SourceConfig sourceConfig = createSourceConfig(); + sourceConfig.setBatchBuilder(null); + sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED"); + Function.FunctionDetails functionDetails = + SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null)); + assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED"); + } + private SourceConfig createSourceConfigWithBatch() { SourceConfig sourceConfig = createSourceConfig(); BatchSourceConfig batchSourceConfig = createBatchSourceConfig();