This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7861a78  Change logging for debezium key value pairs (#5172)
7861a78 is described below

commit 7861a78b9fdbb17275b0e4fd910d6a150f88496b
Author: Ali Ahmed <alahmed...@gmail.com>
AuthorDate: Thu Sep 12 09:00:09 2019 -0700

    Change logging for debezium key value pairs (#5172)
    
    * Make naming consistent for Debezium postgreSql
    
    * Change logging for debezium key value pairs
---
 ...ainer.java => DebeziumPostgreSqlContainer.java} |  4 ++--
 .../integration/functions/PulsarFunctionsTest.java | 14 +++++++-------
 .../integration/io/DebeziumMySqlSourceTester.java  |  6 +++---
 ...er.java => DebeziumPostgreSqlSourceTester.java} | 22 +++++++++++-----------
 4 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgresqlContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
similarity index 91%
rename from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgresqlContainer.java
rename to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index e85c27c..8f32c05 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgresqlContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -21,14 +21,14 @@ package org.apache.pulsar.tests.integration.containers;
 
 import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
-public class DebeziumPostgresqlContainer extends 
ChaosContainer<DebeziumPostgresqlContainer> {
+public class DebeziumPostgreSqlContainer extends 
ChaosContainer<DebeziumPostgreSqlContainer> {
 
     public static final String NAME = "debezium-postgresql-example";
     static final Integer[] PORTS = { 5432 };
 
     private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
 
-    public DebeziumPostgresqlContainer(String clusterName) {
+    public DebeziumPostgreSqlContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
         this.withEnv("POSTGRES_USER", "postgres");
         this.withEnv("POSTGRES_PASSWORD", "postgres");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 7fda45b..23beec8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
-import 
org.apache.pulsar.tests.integration.containers.DebeziumPostgresqlContainer;
+import 
org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -119,8 +119,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     }
 
     @Test
-    public void testDebeziumPostgresqlSource() throws Exception {
-        testDebeziumPostgresqlConnect();
+    public void testDebeziumPostgreSqlSource() throws Exception {
+        testDebeziumPostgreSqlConnect();
     }
 
     private void testSink(SinkTester tester, boolean builtin) throws Exception 
{
@@ -1966,7 +1966,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
-    private  void testDebeziumPostgresqlConnect() throws Exception {
+    private  void testDebeziumPostgreSqlConnect() throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
@@ -1991,11 +1991,11 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 .subscribe();
 
         @Cleanup
-        DebeziumPostgresqlSourceTester sourceTester = new 
DebeziumPostgresqlSourceTester(pulsarCluster);
+        DebeziumPostgreSqlSourceTester sourceTester = new 
DebeziumPostgreSqlSourceTester(pulsarCluster);
 
         // setup debezium postgresql server
-        DebeziumPostgresqlContainer postgresqlContainer = new 
DebeziumPostgresqlContainer(pulsarCluster.getClusterName());
-        sourceTester.setServiceContainer(postgresqlContainer);
+        DebeziumPostgreSqlContainer postgreSqlContainer = new 
DebeziumPostgreSqlContainer(pulsarCluster.getClusterName());
+        sourceTester.setServiceContainer(postgreSqlContainer);
 
         // prepare the testing environment for source
         prepareSource(sourceTester);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 2d1b4b5..580bf2c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -91,9 +91,9 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, 
TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
-            log.info("Received message: {}.", msg.getValue());
-            String key = new String(msg.getValue().getKey());
-            String value = new String(msg.getValue().getValue());
+            final String key = new String(msg.getValue().getKey());
+            final String value = new String(msg.getValue().getValue());
+            log.info("Received message: key = {}, value = {}.", key, value);
             
Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
             
Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
             consumer.acknowledge(msg);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgresqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
similarity index 84%
rename from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgresqlSourceTester.java
rename to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index e95db22..d246258 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgresqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.schema.KeyValue;
-import 
org.apache.pulsar.tests.integration.containers.DebeziumPostgresqlContainer;
+import 
org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.Assert;
@@ -42,23 +42,23 @@ import java.util.concurrent.TimeUnit;
  * which is a Postgresql database server preconfigured with an inventory 
database.
  */
 @Slf4j
-public class DebeziumPostgresqlSourceTester extends 
SourceTester<DebeziumPostgresqlContainer> implements Closeable {
+public class DebeziumPostgreSqlSourceTester extends 
SourceTester<DebeziumPostgreSqlContainer> implements Closeable {
 
     private static final String NAME = "debezium-postgres";
 
     private final String pulsarServiceUrl;
 
     @Getter
-    private DebeziumPostgresqlContainer debeziumPostgresqlContainer;
+    private DebeziumPostgreSqlContainer debeziumPostgresqlContainer;
 
     private final PulsarCluster pulsarCluster;
 
-    public DebeziumPostgresqlSourceTester(PulsarCluster cluster) {
+    public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
         super(NAME);
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + 
PulsarContainer.BROKER_PORT;
 
-        sourceConfig.put("database.hostname", 
DebeziumPostgresqlContainer.NAME);
+        sourceConfig.put("database.hostname", 
DebeziumPostgreSqlContainer.NAME);
         sourceConfig.put("database.port", "5432");
         sourceConfig.put("database.user", "postgres");
         sourceConfig.put("database.password", "postgres");
@@ -71,10 +71,10 @@ public class DebeziumPostgresqlSourceTester extends 
SourceTester<DebeziumPostgre
     }
 
     @Override
-    public void setServiceContainer(DebeziumPostgresqlContainer container) {
+    public void setServiceContainer(DebeziumPostgreSqlContainer container) {
         log.info("start debezium postgresql server container.");
         debeziumPostgresqlContainer = container;
-        pulsarCluster.startService(DebeziumPostgresqlContainer.NAME, 
debeziumPostgresqlContainer);
+        pulsarCluster.startService(DebeziumPostgreSqlContainer.NAME, 
debeziumPostgresqlContainer);
     }
 
     @Override
@@ -93,9 +93,9 @@ public class DebeziumPostgresqlSourceTester extends 
SourceTester<DebeziumPostgre
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, 
TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
-            log.info("Received message: {}.", msg.getValue());
-            String key = new String(msg.getValue().getKey());
-            String value = new String(msg.getValue().getValue());
+            final String key = new String(msg.getValue().getKey());
+            final String value = new String(msg.getValue().getValue());
+            log.info("Received message: key = {}, value = {}.", key, value);
             
Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
             
Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
             consumer.acknowledge(msg);
@@ -109,7 +109,7 @@ public class DebeziumPostgresqlSourceTester extends 
SourceTester<DebeziumPostgre
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            pulsarCluster.stopService(DebeziumPostgresqlContainer.NAME, 
debeziumPostgresqlContainer);
+            pulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, 
debeziumPostgresqlContainer);
         }
     }
 

Reply via email to