This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a4ca8f4  Revert "[fix #9851] Add forwardSourceMessageProperty to 
SourceConfig (#9907)" (#9945)
a4ca8f4 is described below

commit a4ca8f4b4b5fdd034f4bba2c17a0e40a6c33394b
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Mar 18 15:13:58 2021 -0700

    Revert "[fix #9851] Add forwardSourceMessageProperty to SourceConfig 
(#9907)" (#9945)
    
    This reverts commit d0249e5695c75d8d65a4180fd599b17971368f33.
    
    Co-authored-by: Jerry Peng <jer...@splunk.com>
---
 .../org/apache/pulsar/admin/cli/CmdSources.java    |   6 -
 .../apache/pulsar/admin/cli/TestCmdSources.java    |  13 +-
 .../org/apache/pulsar/common/io/SourceConfig.java  |   2 -
 .../pulsar/functions/utils/SourceConfigUtils.java  |  11 --
 .../functions/utils/SourceConfigUtilsTest.java     |   1 -
 .../tests/integration/io/TestPropertySource.java   |  63 --------
 .../integration/io/PulsarSourcePropertyTest.java   | 169 ---------------------
 .../src/test/resources/pulsar-function.xml         |   1 -
 8 files changed, 2 insertions(+), 264 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index a6daf26..7f580f9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -315,8 +315,6 @@ public class CmdSources extends CmdBase {
         protected String sourceConfigString;
         @Parameter(names = "--custom-runtime-options", description = "A string 
that encodes options to customize the runtime, see docs for configured runtime 
for details")
         protected String customRuntimeOptions;
-        @Parameter(names = "--forward-source-message-property", description = 
"Forwarding input message's properties to output topic when processing")
-        protected Boolean forwardSourceMessageProperty = true;
 
         protected SourceConfig sourceConfig;
 
@@ -421,10 +419,6 @@ public class CmdSources extends CmdBase {
             if (customRuntimeOptions != null) {
                 sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
             }
-
-            if (null != forwardSourceMessageProperty) {
-                
sourceConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
-            }
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
         }
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 8a79c0a..4e6d95a 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -73,7 +73,6 @@ public class TestCmdSources {
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon 
Jul 02 00:33:15 +0000 2018\"}";
-    private static final boolean FORWARD_PROPERTIES = true;
 
     private PulsarAdmin pulsarAdmin;
     private Sources source;
@@ -115,7 +114,6 @@ public class TestCmdSources {
         sourceConfig.setArchive(JAR_FILE_PATH);
         sourceConfig.setResources(new Resources(CPU, RAM, DISK));
         sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
-        sourceConfig.setForwardSourceMessageProperty(FORWARD_PROPERTIES);
         return sourceConfig;
     }
 
@@ -577,19 +575,15 @@ public class TestCmdSources {
 
         updateSource.archive = "new-archive";
 
-        updateSource.forwardSourceMessageProperty = true;
-
         updateSource.processArguments();
 
         updateSource.runCmd();
 
-
         verify(source).updateSource(eq(SourceConfig.builder()
                 .tenant(PUBLIC_TENANT)
                 .namespace(DEFAULT_NAMESPACE)
                 .name(updateSource.name)
                 .archive(updateSource.archive)
-                .forwardSourceMessageProperty(true)
                 .build()), eq(updateSource.archive), eq(new UpdateOptions()));
 
 
@@ -597,12 +591,10 @@ public class TestCmdSources {
 
         updateSource.parallelism = 2;
 
-        updateSource.updateAuthData = true;
-
-        updateSource.forwardSourceMessageProperty = false;
-
         updateSource.processArguments();
 
+        updateSource.updateAuthData = true;
+
         UpdateOptions updateOptions = new UpdateOptions();
         updateOptions.setUpdateAuthData(true);
 
@@ -613,7 +605,6 @@ public class TestCmdSources {
                 .namespace(DEFAULT_NAMESPACE)
                 .name(updateSource.name)
                 .parallelism(2)
-                .forwardSourceMessageProperty(false)
                 .build()), eq(null), eq(updateOptions));
 
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index ccd4d14..79a5f81 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -71,6 +71,4 @@ public class SourceConfig {
     private BatchSourceConfig batchSourceConfig;
     // batchBuilder provides two types of batch construction methods, DEFAULT 
and KEY_BASED
     private String batchBuilder;
-
-    private Boolean forwardSourceMessageProperty;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index e658f2a..b9825cc 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -148,12 +148,6 @@ public class SourceConfigUtils {
             
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
         }
 
-        if (sourceConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
-            
sinkSpecBuilder.setForwardSourceMessageProperty(sourceConfig.getForwardSourceMessageProperty());
-        } else {
-            sinkSpecBuilder.setForwardSourceMessageProperty(false);
-        }
-
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
         // use default resources if resources not set
@@ -242,8 +236,6 @@ public class SourceConfigUtils {
             
sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
         }
 
-        
sourceConfig.setForwardSourceMessageProperty(sinkSpec.getForwardSourceMessageProperty());
-
         return sourceConfig;
     }
 
@@ -400,9 +392,6 @@ public class SourceConfigUtils {
             
validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(), 
newConfig.getBatchSourceConfig());
             
mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
         }
-        if (newConfig.getForwardSourceMessageProperty() != null) {
-            
mergedConfig.setForwardSourceMessageProperty(newConfig.getForwardSourceMessageProperty());
-        }
         return mergedConfig;
     }
 
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index c2dc029..20a64f8 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -358,7 +358,6 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         sourceConfig.setParallelism(1);
         sourceConfig.setRuntimeFlags("-DKerberos");
         
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        sourceConfig.setForwardSourceMessageProperty(true);
 
         Map<String, String> consumerConfigs = new HashMap<>();
         consumerConfigs.put("security.protocal", "SASL_PLAINTEXT");
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
deleted file mode 100644
index 245929b..0000000
--- 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-public class TestPropertySource implements Source<String> {
-
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-    }
-
-    @Override
-    public Record<String> read() throws Exception {
-        Thread.sleep(50);
-        return new Record<String>() {
-            @Override
-            public Optional<String> getKey() {
-                return Optional.empty();
-            }
-
-            @Override
-            public String getValue() {
-                return "property";
-            }
-            @Override
-            public Map<String, String> getProperties() {
-                HashMap<String, String> props = new HashMap<String, String>();
-                props.put("hello", "world");
-                props.put("foo", "bar");
-                return props;
-            }
-        };
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
deleted file mode 100644
index 04f97a1..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.policies.data.SinkStatus;
-import org.apache.pulsar.common.policies.data.SourceStatus;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.awaitility.Awaitility;
-import org.testng.annotations.Test;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
-import static 
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-/**
- * Source Property related test cases.
- */
-@Slf4j
-public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite {
-    @Test(groups = {"source"})
-    public void testSourceProperty() throws Exception {
-        String outputTopicName = "test-source-property-input-" + randomName(8);
-        String sourceName = "test-source-property-" + randomName(8);
-        submitSourceConnector(sourceName, outputTopicName, 
"org.apache.pulsar.tests.integration.io.TestPropertySource",  JAVAJAR);
-
-        // get source info
-        getSourceInfoSuccess(sourceName);
-
-        // get source status
-        getSourceStatus(sourceName);
-
-        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-
-            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
-                SourceStatus status = 
admin.sources().getSourceStatus("public", "default", sourceName);
-                assertEquals(status.getInstances().size(), 1);
-                assertTrue(status.getInstances().get(0).getStatus().numWritten 
> 0);
-            });
-        }
-
-        @Cleanup PulsarClient client = PulsarClient.builder()
-                .serviceUrl(container.getPlainTextServiceUrl())
-                .build();
-        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
-                .topic(outputTopicName)
-                .subscriptionType(SubscriptionType.Exclusive)
-                .subscriptionName("test-sub")
-                .subscribe();
-
-        for (int i = 0; i < 10; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals(msg.getValue(), "property");
-            assertEquals(msg.getProperty("hello"), "world");
-            assertEquals(msg.getProperty("foo"), "bar");
-        }
-
-        // delete source
-        deleteSource(sourceName);
-
-        getSourceInfoNotFound(sourceName);
-    }
-
-    private void submitSourceConnector(String sourceName,
-                                       String outputTopicName,
-                                       String className,
-                                       String archive) throws Exception {
-        String[] commands = {
-                PulsarCluster.ADMIN_SCRIPT,
-                "sources", "create",
-                "--name", sourceName,
-                "--destinationTopicName", outputTopicName,
-                "--archive", archive,
-                "--classname", className
-        };
-        log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ContainerExecResult result = container.execCmd(commands);
-        assertTrue(
-                result.getStdout().contains("\"Created successfully\""),
-                result.getStdout());
-    }
-
-    private void getSourceInfoSuccess(String sourceName) throws Exception {
-        ContainerExecResult result = container.execCmd(
-                PulsarCluster.ADMIN_SCRIPT,
-                "sources",
-                "get",
-                "--tenant", "public",
-                "--namespace", "default",
-                "--name", sourceName
-        );
-        assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + 
"\""));
-    }
-
-    private void getSourceStatus(String sourceName) throws Exception {
-        ContainerExecResult result = container.execCmd(
-                PulsarCluster.ADMIN_SCRIPT,
-                "sources",
-                "status",
-                "--tenant", "public",
-                "--namespace", "default",
-                "--name", sourceName
-        );
-        assertTrue(result.getStdout().contains("\"running\" : true"));
-    }
-
-    private void deleteSource(String sourceName) throws Exception {
-        ContainerExecResult result = container.execCmd(
-                PulsarCluster.ADMIN_SCRIPT,
-                "sources",
-                "delete",
-                "--tenant", "public",
-                "--namespace", "default",
-                "--name", sourceName
-        );
-        assertTrue(result.getStdout().contains("Delete source successfully"));
-        result.assertNoStderr();
-    }
-
-    private void getSourceInfoNotFound(String sourceName) throws Exception {
-        try {
-            container.execCmd(
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "sources",
-                    "get",
-                    "--tenant", "public",
-                    "--namespace", "default",
-                    "--name", sourceName);
-            fail("Command should have exited with non-zero");
-        } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Source " + 
sourceName + " doesn't exist"));
-        }
-    }
-}
diff --git a/tests/integration/src/test/resources/pulsar-function.xml 
b/tests/integration/src/test/resources/pulsar-function.xml
index eeda827..026b77f 100644
--- a/tests/integration/src/test/resources/pulsar-function.xml
+++ b/tests/integration/src/test/resources/pulsar-function.xml
@@ -24,7 +24,6 @@
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
             <class 
name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
-            <class 
name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest" />
         </classes>
     </test>
 </suite>

Reply via email to