This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/restructuring
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 03ff7854355a8d33666921964f30f0404ff73fd7
Author: Naburun Nag <n...@cs.wisc.edu>
AuthorDate: Tue Feb 18 19:39:43 2020 -0800

    Fixing the sink test issue
---
 .gitignore                                          |   2 +-
 dunit/locator/locator63530view.dat                  | Bin 198 -> 0 bytes
 .../java/org/geode/kafka/GeodeAsSinkDUnitTest.java  |  20 ++++++++------------
 .../java/org/geode/kafka/GeodeKafkaTestUtils.java   |   1 -
 .../org/geode/kafka/LocatorLauncherWrapper.java     |   3 +--
 .../java/org/geode/kafka/ServerLauncherWrapper.java |   4 ++--
 .../org/geode/kafka/WorkerAndHerderWrapper.java     |   9 +++------
 7 files changed, 15 insertions(+), 24 deletions(-)

diff --git a/.gitignore b/.gitignore
index 80b701f..2e856f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -235,4 +235,4 @@ nbdist/
 .nb-gradle/
 
 .idea/
-./dunit/
+**/dunit
diff --git a/dunit/locator/locator63530view.dat 
b/dunit/locator/locator63530view.dat
deleted file mode 100644
index 10a68c2..0000000
Binary files a/dunit/locator/locator63530view.dat and /dev/null differ
diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java 
b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
index 5c94575..b0a11e6 100644
--- a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -1,7 +1,6 @@
 package org.geode.kafka;
 
 import static org.awaitility.Awaitility.await;
-import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
 import static org.geode.kafka.GeodeKafkaTestUtils.createProducer;
 import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
 import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
@@ -10,15 +9,12 @@ import static 
org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties;
 import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
 import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
 import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
-import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-import com.fasterxml.jackson.module.scala.ser.SymbolSerializerModule;
 import kafka.zk.KafkaZkClient;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Time;
@@ -39,7 +35,7 @@ import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 
-//@RunWith(Parameterized.class)
+@RunWith(Parameterized.class)
 public class GeodeAsSinkDUnitTest {
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
@@ -82,13 +78,13 @@ public class GeodeAsSinkDUnitTest {
     return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}});
   }
 
-  private /*final*/ int numTask = 1;
-  private /*final*/ int numPartition = 1;
+  private final int numTask;
+  private final int numPartition;
 
-//  public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
-//    this.numTask = numTask;
-//    this.numPartition = numPartition;
-//  }
+  public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
+    this.numTask = numTask;
+    this.numPartition = numPartition;
+  }
 
   @Test
   public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() 
throws Exception {
@@ -161,5 +157,5 @@ public class GeodeAsSinkDUnitTest {
       kafkaLocalCluster.stop();
     }
 
-    }
+  }
 }
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java 
b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
index 27f9391..abe0d6a 100644
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
@@ -148,7 +148,6 @@ public class GeodeKafkaTestUtils {
     await().atMost(10, TimeUnit.SECONDS).until(() -> {
       ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(2));
       for (ConsumerRecord<String, String> record : records) {
-        System.out.println("NABA :: " + record);
         valueReceived.incrementAndGet();
       }
       return valueReceived.get() == numEvents;
diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java 
b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
index f0a0a1d..f24367c 100644
--- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
@@ -14,7 +14,6 @@
  */
 package org.geode.kafka;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -30,7 +29,7 @@ public class LocatorLauncherWrapper {
     properties.setProperty(ConfigurationProperties.NAME, "locator1");
 
     Locator.startLocatorAndDS(10334,
-        null/*new 
File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log")*/, properties);
+        null/* new 
File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log") */, properties);
     while (true) {
 
     }
diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java 
b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
index 026012c..4ab75cd 100644
--- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
@@ -48,8 +48,8 @@ public class ServerLauncherWrapper {
         .set(ConfigurationProperties.LOCATORS, locatorString)
         .set(ConfigurationProperties.NAME,
             "server-1")
-//        .set(ConfigurationProperties.LOG_FILE,
-//            "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
+        // .set(ConfigurationProperties.LOG_FILE,
+        // "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
         .set(ConfigurationProperties.LOG_LEVEL, "info")
         // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
         .create();
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java 
b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index 0d8ad40..a3efc23 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,7 +18,6 @@ import static 
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BIND
 import static 
org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,19 +42,18 @@ public class WorkerAndHerderWrapper {
     String offsetPath = "/tmp/connect.offsets";
     String regionToTopicBinding = 
GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
     String topicToRegionBinding = 
GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
-    String testTopicForSink = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+    String sinkTopic = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
     String locatorString = null;
     System.out.println("MaxTask " + maxTasks);
     if (args.length == 7) {
       String sourceRegion = args[1];
       String sinkRegion = args[2];
       String sourceTopic = args[3];
-      String sinkTopic = args[4];
+      sinkTopic = args[4];
       offsetPath = args[5];
       regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
       topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
       locatorString = args[6];
-      System.out.println("NABA args = " + Arrays.deepToString(args));
     }
 
     Map props = new HashMap();
@@ -104,8 +102,7 @@ public class WorkerAndHerderWrapper {
     sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
     sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
     sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
-    System.out.println("NABA : binding " + topicToRegionBinding);
-    sinkProps.put("topics", testTopicForSink);
+    sinkProps.put("topics", sinkTopic);
 
     herder.putConnectorConfig(
         sinkProps.get(ConnectorConfig.NAME_CONFIG),

Reply via email to