This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fad925f7c52 [improve][fn] Implement pip 401: Support set batching
configurations for Pulsar Functions&Sources (#23860)
fad925f7c52 is described below
commit fad925f7c522a30ba9fa005e5788162dd17b21ab
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed Mar 12 13:56:42 2025 +0800
[improve][fn] Implement pip 401: Support set batching configurations for
Pulsar Functions&Sources (#23860)
---
.../{ProducerConfig.java => BatchingConfig.java} | 18 +-
.../pulsar/common/functions/ProducerConfig.java | 1 +
.../functions/instance/JavaInstanceRunnable.java | 2 +
.../functions/instance/ProducerBuilderFactory.java | 29 +++
.../instance/ProducerBuilderFactoryTest.java | 60 +++++
.../proto/src/main/proto/Function.proto | 10 +
.../pulsar/functions/utils/BatchingUtils.java | 86 +++++++
.../functions/utils/FunctionConfigUtils.java | 6 +
.../pulsar/functions/utils/BatchingUtilsTest.java | 70 ++++++
.../integration/functions/PulsarFunctionsTest.java | 64 +++++-
.../functions/java/PulsarFunctionsJavaTest.java | 31 +++
.../python/PulsarFunctionsPythonTest.java | 4 +-
.../functions/utils/CommandGenerator.java | 5 +
.../io/sources/DataGeneratorSourceTest.java | 248 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 15 ++
.../topologies/PulsarStandaloneTestBase.java | 12 +-
.../src/test/resources/pulsar-io-sources.xml | 1 +
17 files changed, 641 insertions(+), 21 deletions(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
similarity index 74%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
index 25ca2ad79c8..1133e8e7e94 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
@@ -23,21 +23,19 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-import org.apache.pulsar.client.api.CompressionType;
-/**
- * Configuration of the producer inside the function.
- */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
-public class ProducerConfig {
- private Integer maxPendingMessages;
- private Integer maxPendingMessagesAcrossPartitions;
- private Boolean useThreadLocalProducers;
- private CryptoConfig cryptoConfig;
+public class BatchingConfig {
+ @Builder.Default
+ private boolean enabled = true;
+ @Builder.Default
+ private Integer batchingMaxPublishDelayMs = 10;
+ private Integer roundRobinRouterBatchingPartitionSwitchFrequency;
+ private Integer batchingMaxMessages;
+ private Integer batchingMaxBytes;
private String batchBuilder;
- private CompressionType compressionType;
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 25ca2ad79c8..483707da2dc 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -40,4 +40,5 @@ public class ProducerConfig {
private CryptoConfig cryptoConfig;
private String batchBuilder;
private CompressionType compressionType;
+ private BatchingConfig batchingConfig;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cfb7e9536a3..61254a1748b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -101,6 +101,7 @@ import
org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSource;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
+import org.apache.pulsar.functions.utils.BatchingUtils;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -1050,6 +1051,7 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
.batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()))
+
.batchingConfig(BatchingUtils.convertFromSpec(conf.getBatchingSpec()))
.compressionType(FunctionCommon.convertFromFunctionDetailsCompressionType(
conf.getCompressionType()));
pulsarSinkConfig.setProducerConfig(builder.build());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
index b08f7f3f2cb..cec0ee91e60 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
@@ -116,6 +116,35 @@ public class ProducerBuilderFactory {
builder.batcherBuilder(BatcherBuilder.DEFAULT);
}
}
+ if (producerConfig.getBatchingConfig() != null) {
+
builder.enableBatching(producerConfig.getBatchingConfig().isEnabled());
+ if
(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() != null
+ &&
producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() > 0) {
+
builder.batchingMaxPublishDelay(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ if
(producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()
!= null
+ &&
producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()
+ > 0) {
+ builder.roundRobinRouterBatchingPartitionSwitchFrequency(
+
producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency());
+ }
+ if
(producerConfig.getBatchingConfig().getBatchingMaxMessages() != null
+ &&
producerConfig.getBatchingConfig().getBatchingMaxMessages() > 0) {
+
builder.batchingMaxMessages(producerConfig.getBatchingConfig().getBatchingMaxMessages());
+ }
+ if (producerConfig.getBatchingConfig().getBatchingMaxBytes()
!= null
+ &&
producerConfig.getBatchingConfig().getBatchingMaxBytes() > 0) {
+
builder.batchingMaxBytes(producerConfig.getBatchingConfig().getBatchingMaxBytes());
+ }
+ if (producerConfig.getBatchingConfig().getBatchBuilder() !=
null) {
+ if
(producerConfig.getBatchingConfig().getBatchBuilder().equals("KEY_BASED")) {
+ builder.batcherBuilder(BatcherBuilder.KEY_BASED);
+ } else {
+ builder.batcherBuilder(BatcherBuilder.DEFAULT);
+ }
+ }
+ }
}
return builder;
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
index 42940f7e2da..a9bb0b21185 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.testng.Assert.assertEquals;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.functions.BatchingConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.mockito.internal.util.MockUtil;
@@ -139,6 +141,62 @@ public class ProducerBuilderFactoryTest {
cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
producerConfig.setCryptoConfig(cryptoConfig);
+ BatchingConfig batchingConfig = new BatchingConfig();
+ batchingConfig.setEnabled(true);
+ batchingConfig.setBatchingMaxPublishDelayMs(20);
+ batchingConfig.setBatchingMaxMessages(100);
+ batchingConfig.setBatchingMaxBytes(-1);
+ producerConfig.setBatchingConfig(batchingConfig);
+ ProducerBuilderFactory builderFactory = new
ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
+ builderFactory.createProducerBuilder("topic", Schema.STRING,
"producerName");
+
+ verify(pulsarClient).newProducer(Schema.STRING);
+ verify(producerBuilder).blockIfQueueFull(true);
+ // enableBatching will be called twice here:
+ // the first time is called by default to keep the backward compability
+ // the second call is called when the producerConfig and
producerConfig.batchingConfig are not null
+ verify(producerBuilder, times(2)).enableBatching(true);
+ verify(producerBuilder).batchingMaxPublishDelay(10,
TimeUnit.MILLISECONDS);
+ verify(producerBuilder).hashingScheme(HashingScheme.Murmur3_32Hash);
+
verify(producerBuilder).messageRoutingMode(MessageRoutingMode.CustomPartition);
+ verify(producerBuilder).messageRouter(FunctionResultRouter.of());
+ verify(producerBuilder).sendTimeout(0, TimeUnit.SECONDS);
+ verify(producerBuilder).topic("topic");
+ verify(producerBuilder).producerName("producerName");
+
+ verify(producerBuilder).compressionType(CompressionType.SNAPPY);
+ verify(producerBuilder).batcherBuilder(BatcherBuilder.KEY_BASED);
+ verify(producerBuilder).maxPendingMessages(5000);
+ verify(producerBuilder).maxPendingMessagesAcrossPartitions(50000);
+ TestCryptoKeyReader lastInstance = TestCryptoKeyReader.LAST_INSTANCE;
+ assertNotNull(lastInstance);
+ assertEquals(lastInstance.configs,
cryptoConfig.getCryptoKeyReaderConfig());
+ verify(producerBuilder).cryptoKeyReader(lastInstance);
+
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
+ verify(producerBuilder).addEncryptionKey("key1");
+ verify(producerBuilder).addEncryptionKey("key2");
+ verify(producerBuilder).batchingMaxPublishDelay(20,
TimeUnit.MILLISECONDS);
+ verify(producerBuilder).batchingMaxMessages(100);
+ verifyNoMoreInteractions(producerBuilder);
+ }
+
+ @Test
+ public void testCreateProducerBuilderWithBatchingDisabled() {
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setBatchBuilder("KEY_BASED");
+ producerConfig.setCompressionType(CompressionType.SNAPPY);
+ producerConfig.setMaxPendingMessages(5000);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(50000);
+ CryptoConfig cryptoConfig = new CryptoConfig();
+
cryptoConfig.setProducerCryptoFailureAction(ProducerCryptoFailureAction.FAIL);
+ cryptoConfig.setEncryptionKeys(new String[]{"key1", "key2"});
+ cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
+
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
+ producerConfig.setCryptoConfig(cryptoConfig);
+ BatchingConfig batchingConfig = new BatchingConfig();
+ batchingConfig.setEnabled(false);
+ batchingConfig.setBatchingMaxPublishDelayMs(0);
+ producerConfig.setBatchingConfig(batchingConfig);
ProducerBuilderFactory builderFactory = new
ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
builderFactory.createProducerBuilder("topic", Schema.STRING,
"producerName");
verifyCommon();
@@ -153,12 +211,14 @@ public class ProducerBuilderFactoryTest {
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
verify(producerBuilder).addEncryptionKey("key1");
verify(producerBuilder).addEncryptionKey("key2");
+ verify(producerBuilder).enableBatching(false);
verifyNoMoreInteractions(producerBuilder);
}
public static class TestCryptoKeyReader implements CryptoKeyReader {
static TestCryptoKeyReader LAST_INSTANCE;
Map<String, Object> configs;
+
public TestCryptoKeyReader(Map<String, Object> configs) {
this.configs = configs;
assert LAST_INSTANCE == null;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index de3f03a3900..5bd1a42b5a3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -121,6 +121,7 @@ message ProducerSpec {
CryptoSpec cryptoSpec = 4;
string batchBuilder = 5;
CompressionType compressionType = 6;
+ BatchingSpec batchingSpec = 7;
}
message CryptoSpec {
@@ -147,6 +148,15 @@ message CryptoSpec {
FailureAction consumerCryptoFailureAction = 5;
}
+message BatchingSpec {
+ bool enabled = 1;
+ int32 batchingMaxPublishDelayMs = 2;
+ int32 roundRobinRouterBatchingPartitionSwitchFrequency = 3;
+ int32 batchingMaxMessages = 4;
+ int32 batchingMaxBytes = 5;
+ string batchBuilder = 6;
+}
+
message SourceSpec {
string className = 1;
// map in json format
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
new file mode 100644
index 00000000000..7db61f7ace8
--- /dev/null
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.functions.proto.Function;
+
+public final class BatchingUtils {
+ public static Function.BatchingSpec convert(BatchingConfig config) {
+ if (config == null) {
+ return null;
+ }
+
+ Function.BatchingSpec.Builder builder =
Function.BatchingSpec.newBuilder()
+ .setEnabled(config.isEnabled());
+
+ if (config.getBatchingMaxPublishDelayMs() != null &&
config.getBatchingMaxPublishDelayMs() > 0) {
+
builder.setBatchingMaxPublishDelayMs(config.getBatchingMaxPublishDelayMs());
+ }
+ if (config.getRoundRobinRouterBatchingPartitionSwitchFrequency() !=
null
+ &&
config.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
+ builder.setRoundRobinRouterBatchingPartitionSwitchFrequency(
+
config.getRoundRobinRouterBatchingPartitionSwitchFrequency());
+ }
+ if (config.getBatchingMaxMessages() != null &&
config.getBatchingMaxMessages() > 0) {
+ builder.setBatchingMaxMessages(config.getBatchingMaxMessages());
+ }
+ if (config.getBatchingMaxBytes() != null &&
config.getBatchingMaxBytes() > 0) {
+ builder.setBatchingMaxBytes(config.getBatchingMaxBytes());
+ }
+ if (config.getBatchBuilder() != null &&
!config.getBatchBuilder().isEmpty()) {
+ builder.setBatchBuilder(config.getBatchBuilder());
+ }
+
+ return builder.build();
+ }
+
+ public static BatchingConfig convertFromSpec(Function.BatchingSpec spec) {
+ // to keep the backward compatibility, when batchingSpec is null or
empty
+ // the batching is enabled by default, and the default max publish
delay is 10ms
+ if (spec == null || spec.toString().equals("")) {
+ return BatchingConfig.builder()
+ .enabled(true)
+ .batchingMaxPublishDelayMs(10)
+ .build();
+ }
+
+ BatchingConfig.BatchingConfigBuilder builder = BatchingConfig.builder()
+ .enabled(spec.getEnabled());
+
+ if (spec.getBatchingMaxPublishDelayMs() > 0) {
+
builder.batchingMaxPublishDelayMs(spec.getBatchingMaxPublishDelayMs());
+ }
+ if (spec.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
+ builder.roundRobinRouterBatchingPartitionSwitchFrequency(
+
spec.getRoundRobinRouterBatchingPartitionSwitchFrequency());
+ }
+ if (spec.getBatchingMaxMessages() > 0) {
+ builder.batchingMaxMessages(spec.getBatchingMaxMessages());
+ }
+ if (spec.getBatchingMaxBytes() > 0) {
+ builder.batchingMaxBytes(spec.getBatchingMaxBytes());
+ }
+ if (spec.getBatchBuilder() != null &&
!spec.getBatchBuilder().isEmpty()) {
+ builder.batchBuilder(spec.getBatchBuilder());
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 45fb4c1cb1e..d19a77e9dea 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -524,6 +524,9 @@ public class FunctionConfigUtils {
if (producerConf.getBatchBuilder() != null) {
builder.setBatchBuilder(producerConf.getBatchBuilder());
}
+ if (producerConf.getBatchingConfig() != null) {
+
builder.setBatchingSpec(BatchingUtils.convert(producerConf.getBatchingConfig()));
+ }
if (producerConf.getCompressionType() != null) {
builder.setCompressionType(convertFromCompressionType(producerConf.getCompressionType()));
} else {
@@ -546,6 +549,9 @@ public class FunctionConfigUtils {
if (spec.getBatchBuilder() != null) {
producerConfig.setBatchBuilder(spec.getBatchBuilder());
}
+ if (spec.hasBatchingSpec()) {
+
producerConfig.setBatchingConfig(BatchingUtils.convertFromSpec(spec.getBatchingSpec()));
+ }
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType()));
return producerConfig;
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
new file mode 100644
index 00000000000..655c0d3837e
--- /dev/null
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import static org.testng.Assert.*;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+public class BatchingUtilsTest {
+
+ @Test
+ public void testConvert() {
+ BatchingConfig config = BatchingConfig.builder()
+ .enabled(true)
+ .batchingMaxPublishDelayMs(30)
+ .roundRobinRouterBatchingPartitionSwitchFrequency(10)
+ .batchingMaxMessages(1000)
+ .batchBuilder("DEFAULT")
+ .build();
+ Function.BatchingSpec spec = BatchingUtils.convert(config);
+ assertEquals(spec.getEnabled(), true);
+ assertEquals(spec.getBatchingMaxPublishDelayMs(), 30);
+
assertEquals(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency(), 10);
+ assertEquals(spec.getBatchingMaxMessages(), 1000);
+ assertEquals(spec.getBatchingMaxBytes(), 0);
+ assertEquals(spec.getBatchBuilder(), "DEFAULT");
+ }
+
+ @Test
+ public void testConvertFromSpec() {
+ Function.BatchingSpec spec = Function.BatchingSpec.newBuilder()
+ .setEnabled(true)
+ .setBatchingMaxPublishDelayMs(30)
+ .setRoundRobinRouterBatchingPartitionSwitchFrequency(10)
+ .setBatchingMaxMessages(1000)
+ .setBatchBuilder("DEFAULT")
+ .build();
+ BatchingConfig config = BatchingUtils.convertFromSpec(spec);
+ assertEquals(config.isEnabled(), true);
+ assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 30);
+
assertEquals(config.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue(),
10);
+ assertEquals(config.getBatchingMaxMessages().intValue(), 1000);
+ assertEquals(config.getBatchingMaxBytes(), null);
+ assertEquals(config.getBatchBuilder(), "DEFAULT");
+ }
+
+ @Test
+ public void testConvertFromSpecFromNull() {
+ BatchingConfig config = BatchingUtils.convertFromSpec(null);
+ assertTrue(config.isEnabled());
+ assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 10);
+ }
+}
\ No newline at end of file
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 694dcba5eaf..ae94ddd1be5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -62,8 +62,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.functions.BatchingConfig;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
@@ -695,7 +697,18 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
boolean pyZip,
boolean multipleInput,
boolean withExtraDeps,
-
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws
Exception {
+ ProducerConfig producerConfig)
throws Exception {
+ testExclamationFunction(runtime, isTopicPattern, pyZip, multipleInput,
withExtraDeps, producerConfig, null);
+ }
+
+ protected void testExclamationFunction(Runtime runtime,
+ boolean isTopicPattern,
+ boolean pyZip,
+ boolean multipleInput,
+ boolean withExtraDeps,
+ ProducerConfig producerConfig,
+
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+ throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime ==
Runtime.PYTHON || runtime == Runtime.GO)) {
// python&go can only run on process mode
return;
@@ -725,10 +738,21 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// submit the exclamation function
submitExclamationFunction(
- runtime, inputTopicName, outputTopicName, functionName, pyZip,
withExtraDeps, schema, commandGeneratorConsumer);
+ runtime, inputTopicName, outputTopicName, functionName, pyZip,
withExtraDeps, schema,
+ commandGeneratorConsumer);
// get function info
final String info = getFunctionInfoSuccess(functionName);
+ FunctionConfig config =
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info,
FunctionConfig.class);
+
+ // check batching config
+ if (runtime == Runtime.JAVA) {
+ BatchingConfig batchingConfig = null;
+ if (producerConfig != null && producerConfig.getBatchingConfig()
!= null) {
+ batchingConfig = producerConfig.getBatchingConfig();
+ }
+ checkBatchingConfig(functionName, batchingConfig, config);
+ }
// get function stats
getFunctionStatsEmpty(functionName);
@@ -770,8 +794,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
break;
}
- checkSubscriptionType(inputTopicName,
-
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info,
FunctionConfig.class));
+ // check subscription type
+ checkSubscriptionType(inputTopicName, config);
// delete function
deleteFunction(functionName);
@@ -819,6 +843,26 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
});
}
+ // checking batching config, we can only check this by checking the logs
for now
+ private void checkBatchingConfig(String functionName, BatchingConfig
config, FunctionConfig functionConfig) {
+ if (config != null) {
+ assertNotNull(functionConfig.getProducerConfig());
+
assertNotNull(functionConfig.getProducerConfig().getBatchingConfig());
+ assertEquals(config.toString(),
functionConfig.getProducerConfig().getBatchingConfig().toString());
+ }
+
+ String functionLogs = pulsarCluster.getFunctionLogs(functionName);
+ if (config == null || config.isEnabled()) {
+ BatchingConfig finalConfig = config;
+ if (finalConfig == null) {
+ finalConfig = BatchingConfig.builder().build();
+ }
+ assertTrue(functionLogs.contains(finalConfig.toString()));
+ } else {
+ assertTrue(functionLogs.contains("BatchingConfig(enabled=false"));
+ }
+ }
+
private void submitExclamationFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
@@ -837,7 +881,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
boolean pyZip,
boolean withExtraDeps,
Schema<?> schema,
-
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws
Exception {
+
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+ throws Exception {
submitFunction(
runtime,
inputTopicName,
@@ -860,7 +905,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
boolean isPublishFunction,
String functionClass,
Schema<T> inputTopicSchema,
-
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws
Exception {
+
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+ throws Exception {
String file = null;
if (Runtime.JAVA == runtime) {
@@ -894,7 +940,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
String functionFile,
String functionClass,
Schema<T> inputTopicSchema,
-
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws
Exception {
+
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+ throws Exception {
submitFunction(runtime, inputTopicName, outputTopicName, functionName,
functionFile, functionClass,
inputTopicSchema, null, null, null, null, null, null,
commandGeneratorConsumer);
@@ -913,7 +960,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
SubscriptionInitialPosition
subscriptionInitialPosition,
String inputTypeClassName,
String outputTypeClassName,
-
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws
Exception {
+
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+ throws Exception {
if (StringUtils.isNotEmpty(inputTopicName)) {
ensureSubscriptionCreated(
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 939d6e19d1f..fc9ffe9da0d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.Map;
import org.apache.commons.collections4.map.HashedMap;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -109,6 +111,35 @@ public abstract class PulsarFunctionsJavaTest extends
PulsarFunctionsTest {
testExclamationFunction(Runtime.JAVA, true, false, false, false);
}
+ @Test(groups = {"java_function", "function"})
+ public void testJavaExclamationCustomBatchingFunction() throws Exception {
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setBatchingConfig(BatchingConfig.builder()
+ .enabled(true)
+ .batchingMaxPublishDelayMs(5)
+ .batchingMaxMessages(100)
+ .batchingMaxBytes(64 * 1024)
+ .roundRobinRouterBatchingPartitionSwitchFrequency(5)
+ .batchBuilder("KEY_BASED")
+ .build());
+ testExclamationFunction(Runtime.JAVA, false, false, false, false,
+ producerConfig, commandGenerator -> {
+ commandGenerator.setProducerConfig(producerConfig);
+ });
+ }
+
+ @Test(groups = {"java_function", "function"})
+ public void testJavaExclamationDiableBatchingFunction() throws Exception {
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setBatchingConfig(BatchingConfig.builder()
+ .enabled(false)
+ .build());
+ testExclamationFunction(Runtime.JAVA, false, false, false, false,
+ producerConfig, commandGenerator -> {
+ commandGenerator.setProducerConfig(producerConfig);
+ });
+ }
+
@Test(groups = {"java_function", "function"})
public void testJavaLoggingFunction() throws Exception {
testLoggingFunction(Runtime.JAVA);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
index 9ba210b9988..05441f0111b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
@@ -72,7 +72,7 @@ public abstract class PulsarFunctionsPythonTest extends
PulsarFunctionsTest {
@Test(groups = {"python_function", "function"})
public void testRetainOrderingTest() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, false,
- false, generator -> {
+ false, null, generator -> {
generator.setRetainOrdering(true);
});
}
@@ -80,7 +80,7 @@ public abstract class PulsarFunctionsPythonTest extends
PulsarFunctionsTest {
@Test(groups = {"python_function", "function"})
public void testRetainKeyOrderingTest() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, false,
- false, generator -> {
+ false, null, generator -> {
System.out.println("calling
generator.setRetainKeyOrdering(true);");
generator.setRetainKeyOrdering(true);
});
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index e0fbd604007..08765b5e535 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -26,6 +26,7 @@ import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@Getter
@@ -66,6 +67,7 @@ public class CommandGenerator {
private SubscriptionInitialPosition subscriptionInitialPosition;
private Boolean retainOrdering;
private Boolean retainKeyOrdering;
+ private ProducerConfig producerConfig;
private Map<String, String> userConfig = new HashMap<>();
public static final String JAVAJAR =
"/pulsar/examples/java-test-functions.jar";
@@ -256,6 +258,9 @@ public class CommandGenerator {
}
break;
}
+ if (producerConfig != null) {
+ commandBuilder.append(" --producer-config \'" + new
Gson().toJson(producerConfig) + "\'");
+ }
return commandBuilder.toString();
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
new file mode 100644
index 00000000000..79981fce2b4
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources;
+
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+import static
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * This tests verifies that a batch source can be successfully submitted and
run via the pulsar-admin CLI
+ */
+@Slf4j
+public class DataGeneratorSourceTest extends PulsarStandaloneTestSuite {
+
+ @Test(groups = {"source"})
+ public void testSource() throws Exception {
+ testGenericRecordSource(null);
+ }
+
+ @Test(groups = {"source"})
+ public void testSourceCustomBatching() throws Exception {
+ BatchingConfig config = BatchingConfig.builder()
+ .enabled(true)
+ .batchingMaxPublishDelayMs(5)
+ .roundRobinRouterBatchingPartitionSwitchFrequency(10)
+ .batchingMaxMessages(10)
+ .batchingMaxBytes(32 * 1024)
+ .batchBuilder("KEY_BASED")
+ .build();
+ testGenericRecordSource(config);
+ }
+
+ @Test(groups = {"source"})
+ public void testSourceDisableBatching() throws Exception {
+ BatchingConfig config = BatchingConfig.builder()
+ .enabled(false)
+ .build();
+ testGenericRecordSource(config);
+ }
+
+ public void testGenericRecordSource(BatchingConfig config) throws
Exception {
+ String outputTopicName = "test-state-source-output-" + randomName(8);
+ String sourceName = "test-state-source-" + randomName(8);
+ int numMessages = 10;
+ try {
+ ProducerConfig producerConfig = null;
+ if (config != null) {
+ producerConfig = ProducerConfig.builder()
+ .batchingConfig(config)
+ .build();
+ }
+ submitSourceConnector(
+ sourceName,
+ outputTopicName,
+ "builtin://data-generator",
+ producerConfig);
+
+ // get source info
+ String info = getSourceInfoSuccess(container, sourceName);
+ SourceConfig sourceConfig =
+
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info,
SourceConfig.class);
+ // checking batching config is applied
+ checkBatchingConfig(sourceName, config, sourceConfig);
+
+ // get source status
+ getSourceStatus(container, sourceName);
+
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ retryStrategically((test) -> {
+ try {
+ SourceStatus status =
admin.sources().getSourceStatus("public", "default", sourceName);
+ return status.getInstances().size() > 0
+ &&
status.getInstances().get(0).getStatus().numWritten >= 10;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ SourceStatus status =
admin.sources().getSourceStatus("public", "default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten
>= 10);
+ }
+
+ // delete source
+ deleteSource(container, sourceName);
+
+ getSourceInfoNotFound(container, sourceName);
+ } finally {
+ dumpFunctionLogs(sourceName);
+ }
+ }
+
+ private void submitSourceConnector(String sourceName,
+ String outputTopicName,
+ String archive,
+ ProducerConfig producerConfig) throws
Exception {
+ List<String> commands = new ArrayList<>(List.of(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources", "create",
+ "--name", sourceName,
+ "--destinationTopicName", outputTopicName,
+ "--archive", archive
+ ));
+ if (producerConfig != null) {
+ commands.add("--producer-config");
+ commands.add(new Gson().toJson(producerConfig));
+ }
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands.toArray(new
String[0]));
+ assertTrue(
+ result.getStdout().contains("Created successfully"),
+ result.getStdout());
+ }
+
+ private static String getSourceInfoSuccess(StandaloneContainer container,
String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sourceName +
"\""));
+ return result.getStdout();
+ }
+
+ private static void getSourceStatus(StandaloneContainer container, String
sourceName) throws Exception {
+ retryStrategically((test) -> {
+ try {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ if (result.getStdout().contains("\"running\" : true")) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Encountered error when getting source status", e);
+ return false;
+ }
+ }, 10, 200);
+
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
+ // checking batching config, we can only check this by checking the logs
for now
+ private void checkBatchingConfig(String functionName, BatchingConfig
config, SourceConfig sourceConfig) {
+ if (config != null) {
+ assertNotNull(sourceConfig.getProducerConfig());
+
assertNotNull(sourceConfig.getProducerConfig().getBatchingConfig());
+ assertEquals(config.toString(),
sourceConfig.getProducerConfig().getBatchingConfig().toString());
+ }
+
+ String functionLogs = getFunctionLogs(functionName);
+ if (config == null || config.isEnabled()) {
+ BatchingConfig finalConfig = config;
+ if (finalConfig == null) {
+ finalConfig = BatchingConfig.builder().build();
+ }
+ assertTrue(functionLogs.contains(finalConfig.toString()));
+ } else {
+ assertTrue(functionLogs.contains("BatchingConfig(enabled=false"));
+ }
+ }
+
+ private static void deleteSource(StandaloneContainer container, String
sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("Delete source successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
+ private static void getSourceInfoNotFound(StandaloneContainer container,
String sourceName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " +
sourceName + " doesn't exist"));
+ }
+ }
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 35fb453c4bb..6e3c4530f5d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -720,6 +720,21 @@ public class PulsarCluster {
enabled ? "--enable" : "--disable");
}
+ public String getFunctionLogs(String name) {
+ StringBuilder logs = new StringBuilder();
+ for (WorkerContainer container : getAlWorkers()) {
+ try {
+ String logFile = "/pulsar/logs/functions/public/default/" +
name + "/" + name + "-0.log";
+ logs.append(container.<String>copyFileFromContainer(logFile,
(inputStream) -> {
+ return IOUtils.toString(inputStream, "utf-8");
+ }));
+ } catch (Exception e) {
+ log.error("Failed to get function logs from container {}",
container.getContainerName(), e);
+ }
+ }
+ return logs.toString();
+ }
+
public void dumpFunctionLogs(String name) {
for (WorkerContainer container : getAlWorkers()) {
log.info("Trying to get function {} logs from container {}", name,
container.getContainerName());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 411b5217501..23a17a1b896 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -107,7 +107,17 @@ public abstract class PulsarStandaloneTestBase extends
PulsarTestBase {
}
}
-
+ protected String getFunctionLogs(String name) {
+ try {
+ String logFile = "/pulsar/logs/functions/public/default/" + name +
"/" + name + "-0.log";
+ return container.<String>copyFileFromContainer(logFile,
(inputStream) -> {
+ return IOUtils.toString(inputStream, "utf-8");
+ });
+ } catch (Throwable err) {
+ log.info("Cannot get {} logs", name, err);
+ return "";
+ }
+ }
protected void dumpFunctionLogs(String name) {
try {
diff --git a/tests/integration/src/test/resources/pulsar-io-sources.xml
b/tests/integration/src/test/resources/pulsar-io-sources.xml
index 636b3e47919..3d3b150f664 100644
--- a/tests/integration/src/test/resources/pulsar-io-sources.xml
+++ b/tests/integration/src/test/resources/pulsar-io-sources.xml
@@ -24,6 +24,7 @@
<classes>
<class
name="org.apache.pulsar.tests.integration.io.sources.debezium.PulsarDebeziumSourcesTest"
/>
<class
name="org.apache.pulsar.tests.integration.io.sources.BatchSourceTest" />
+ <class
name="org.apache.pulsar.tests.integration.io.sources.DataGeneratorSourceTest" />
<class
name="org.apache.pulsar.tests.integration.io.sources.AvroKafkaSourceTest" />
</classes>
</test>