This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a4ca8f4 Revert "[fix #9851] Add forwardSourceMessageProperty to SourceConfig (#9907)" (#9945) a4ca8f4 is described below commit a4ca8f4b4b5fdd034f4bba2c17a0e40a6c33394b Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Mar 18 15:13:58 2021 -0700 Revert "[fix #9851] Add forwardSourceMessageProperty to SourceConfig (#9907)" (#9945) This reverts commit d0249e5695c75d8d65a4180fd599b17971368f33. Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../org/apache/pulsar/admin/cli/CmdSources.java | 6 - .../apache/pulsar/admin/cli/TestCmdSources.java | 13 +- .../org/apache/pulsar/common/io/SourceConfig.java | 2 - .../pulsar/functions/utils/SourceConfigUtils.java | 11 -- .../functions/utils/SourceConfigUtilsTest.java | 1 - .../tests/integration/io/TestPropertySource.java | 63 -------- .../integration/io/PulsarSourcePropertyTest.java | 169 --------------------- .../src/test/resources/pulsar-function.xml | 1 - 8 files changed, 2 insertions(+), 264 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index a6daf26..7f580f9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -315,8 +315,6 @@ public class CmdSources extends CmdBase { protected String sourceConfigString; @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details") protected String customRuntimeOptions; - @Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties to output topic when processing") - protected Boolean forwardSourceMessageProperty = true; protected SourceConfig sourceConfig; @@ -421,10 +419,6 @@ public class CmdSources extends CmdBase { if (customRuntimeOptions != null) { sourceConfig.setCustomRuntimeOptions(customRuntimeOptions); } - - if (null != forwardSourceMessageProperty) { - sourceConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty); - } // check if source configs are valid validateSourceConfigs(sourceConfig); } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 8a79c0a..4e6d95a 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -73,7 +73,6 @@ public class TestCmdSources { private static final Long RAM = 1024L * 1024L; private static final Long DISK = 1024L * 1024L * 1024L; private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}"; - private static final boolean FORWARD_PROPERTIES = true; private PulsarAdmin pulsarAdmin; private Sources source; @@ -115,7 +114,6 @@ public class TestCmdSources { sourceConfig.setArchive(JAR_FILE_PATH); sourceConfig.setResources(new Resources(CPU, RAM, DISK)); sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING)); - sourceConfig.setForwardSourceMessageProperty(FORWARD_PROPERTIES); return sourceConfig; } @@ -577,19 +575,15 @@ public class TestCmdSources { updateSource.archive = "new-archive"; - updateSource.forwardSourceMessageProperty = true; - updateSource.processArguments(); updateSource.runCmd(); - verify(source).updateSource(eq(SourceConfig.builder() .tenant(PUBLIC_TENANT) .namespace(DEFAULT_NAMESPACE) .name(updateSource.name) .archive(updateSource.archive) - .forwardSourceMessageProperty(true) .build()), eq(updateSource.archive), eq(new UpdateOptions())); @@ -597,12 +591,10 @@ public class TestCmdSources { updateSource.parallelism = 2; - updateSource.updateAuthData = true; - - updateSource.forwardSourceMessageProperty = false; - updateSource.processArguments(); + updateSource.updateAuthData = true; + UpdateOptions updateOptions = new UpdateOptions(); updateOptions.setUpdateAuthData(true); @@ -613,7 +605,6 @@ public class TestCmdSources { .namespace(DEFAULT_NAMESPACE) .name(updateSource.name) .parallelism(2) - .forwardSourceMessageProperty(false) .build()), eq(null), eq(updateOptions)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index ccd4d14..79a5f81 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -71,6 +71,4 @@ public class SourceConfig { private BatchSourceConfig batchSourceConfig; // batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED private String batchBuilder; - - private Boolean forwardSourceMessageProperty; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index e658f2a..b9825cc 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -148,12 +148,6 @@ public class SourceConfigUtils { sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig())); } - if (sourceConfig.getForwardSourceMessageProperty() == Boolean.TRUE) { - sinkSpecBuilder.setForwardSourceMessageProperty(sourceConfig.getForwardSourceMessageProperty()); - } else { - sinkSpecBuilder.setForwardSourceMessageProperty(false); - } - functionDetailsBuilder.setSink(sinkSpecBuilder); // use default resources if resources not set @@ -242,8 +236,6 @@ public class SourceConfigUtils { sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions()); } - sourceConfig.setForwardSourceMessageProperty(sinkSpec.getForwardSourceMessageProperty()); - return sourceConfig; } @@ -400,9 +392,6 @@ public class SourceConfigUtils { validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(), newConfig.getBatchSourceConfig()); mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig()); } - if (newConfig.getForwardSourceMessageProperty() != null) { - mergedConfig.setForwardSourceMessageProperty(newConfig.getForwardSourceMessageProperty()); - } return mergedConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index c2dc029..20a64f8 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -358,7 +358,6 @@ public class SourceConfigUtilsTest extends PowerMockTestCase { sourceConfig.setParallelism(1); sourceConfig.setRuntimeFlags("-DKerberos"); sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - sourceConfig.setForwardSourceMessageProperty(true); Map<String, String> consumerConfigs = new HashMap<>(); consumerConfigs.put("security.protocal", "SASL_PLAINTEXT"); diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java deleted file mode 100644 index 245929b..0000000 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.io; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.Source; -import org.apache.pulsar.io.core.SourceContext; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -public class TestPropertySource implements Source<String> { - - @Override - public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { - } - - @Override - public Record<String> read() throws Exception { - Thread.sleep(50); - return new Record<String>() { - @Override - public Optional<String> getKey() { - return Optional.empty(); - } - - @Override - public String getValue() { - return "property"; - } - @Override - public Map<String, String> getProperties() { - HashMap<String, String> props = new HashMap<String, String>(); - props.put("hello", "world"); - props.put("foo", "bar"); - return props; - } - }; - } - - @Override - public void close() throws Exception { - - } -} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java deleted file mode 100644 index 04f97a1..0000000 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.io; - -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.functions.FunctionState; -import org.apache.pulsar.common.policies.data.SinkStatus; -import org.apache.pulsar.common.policies.data.SourceStatus; -import org.apache.pulsar.tests.integration.docker.ContainerExecException; -import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; -import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; -import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import org.awaitility.Awaitility; -import org.testng.annotations.Test; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; -import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -/** - * Source Property related test cases. - */ -@Slf4j -public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite { - @Test(groups = {"source"}) - public void testSourceProperty() throws Exception { - String outputTopicName = "test-source-property-input-" + randomName(8); - String sourceName = "test-source-property-" + randomName(8); - submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestPropertySource", JAVAJAR); - - // get source info - getSourceInfoSuccess(sourceName); - - // get source status - getSourceStatus(sourceName); - - try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { - - Awaitility.await().ignoreExceptions().untilAsserted(() -> { - SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); - assertEquals(status.getInstances().size(), 1); - assertTrue(status.getInstances().get(0).getStatus().numWritten > 0); - }); - } - - @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(container.getPlainTextServiceUrl()) - .build(); - @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING) - .topic(outputTopicName) - .subscriptionType(SubscriptionType.Exclusive) - .subscriptionName("test-sub") - .subscribe(); - - for (int i = 0; i < 10; i++) { - Message<String> msg = consumer.receive(); - assertEquals(msg.getValue(), "property"); - assertEquals(msg.getProperty("hello"), "world"); - assertEquals(msg.getProperty("foo"), "bar"); - } - - // delete source - deleteSource(sourceName); - - getSourceInfoNotFound(sourceName); - } - - private void submitSourceConnector(String sourceName, - String outputTopicName, - String className, - String archive) throws Exception { - String[] commands = { - PulsarCluster.ADMIN_SCRIPT, - "sources", "create", - "--name", sourceName, - "--destinationTopicName", outputTopicName, - "--archive", archive, - "--classname", className - }; - log.info("Run command : {}", StringUtils.join(commands, ' ')); - ContainerExecResult result = container.execCmd(commands); - assertTrue( - result.getStdout().contains("\"Created successfully\""), - result.getStdout()); - } - - private void getSourceInfoSuccess(String sourceName) throws Exception { - ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "sources", - "get", - "--tenant", "public", - "--namespace", "default", - "--name", sourceName - ); - assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); - } - - private void getSourceStatus(String sourceName) throws Exception { - ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "sources", - "status", - "--tenant", "public", - "--namespace", "default", - "--name", sourceName - ); - assertTrue(result.getStdout().contains("\"running\" : true")); - } - - private void deleteSource(String sourceName) throws Exception { - ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "sources", - "delete", - "--tenant", "public", - "--namespace", "default", - "--name", sourceName - ); - assertTrue(result.getStdout().contains("Delete source successfully")); - result.assertNoStderr(); - } - - private void getSourceInfoNotFound(String sourceName) throws Exception { - try { - container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "sources", - "get", - "--tenant", "public", - "--namespace", "default", - "--name", sourceName); - fail("Command should have exited with non-zero"); - } catch (ContainerExecException e) { - assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); - } - } -} diff --git a/tests/integration/src/test/resources/pulsar-function.xml b/tests/integration/src/test/resources/pulsar-function.xml index eeda827..026b77f 100644 --- a/tests/integration/src/test/resources/pulsar-function.xml +++ b/tests/integration/src/test/resources/pulsar-function.xml @@ -24,7 +24,6 @@ <classes> <class name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" /> <class name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" /> - <class name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest" /> </classes> </test> </suite>