This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/restructuring in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 03ff7854355a8d33666921964f30f0404ff73fd7 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Tue Feb 18 19:39:43 2020 -0800 Fixing the sink test issue --- .gitignore | 2 +- dunit/locator/locator63530view.dat | Bin 198 -> 0 bytes .../java/org/geode/kafka/GeodeAsSinkDUnitTest.java | 20 ++++++++------------ .../java/org/geode/kafka/GeodeKafkaTestUtils.java | 1 - .../org/geode/kafka/LocatorLauncherWrapper.java | 3 +-- .../java/org/geode/kafka/ServerLauncherWrapper.java | 4 ++-- .../org/geode/kafka/WorkerAndHerderWrapper.java | 9 +++------ 7 files changed, 15 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 80b701f..2e856f5 100644 --- a/.gitignore +++ b/.gitignore @@ -235,4 +235,4 @@ nbdist/ .nb-gradle/ .idea/ -./dunit/ +**/dunit diff --git a/dunit/locator/locator63530view.dat b/dunit/locator/locator63530view.dat deleted file mode 100644 index 10a68c2..0000000 Binary files a/dunit/locator/locator63530view.dat and /dev/null differ diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java index 5c94575..b0a11e6 100644 --- a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java +++ b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java @@ -1,7 +1,6 @@ package org.geode.kafka; import static org.awaitility.Awaitility.await; -import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; import static org.geode.kafka.GeodeKafkaTestUtils.createProducer; import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; @@ -10,15 +9,12 @@ import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties; import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; -import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed; import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import com.fasterxml.jackson.module.scala.ser.SymbolSerializerModule; import kafka.zk.KafkaZkClient; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Time; @@ -39,7 +35,7 @@ import org.apache.geode.test.dunit.rules.ClientVM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; import org.apache.geode.test.dunit.rules.MemberVM; -//@RunWith(Parameterized.class) +@RunWith(Parameterized.class) public class GeodeAsSinkDUnitTest { @Rule public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); @@ -82,13 +78,13 @@ public class GeodeAsSinkDUnitTest { return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}}); } - private /*final*/ int numTask = 1; - private /*final*/ int numPartition = 1; + private final int numTask; + private final int numPartition; -// public GeodeAsSinkDUnitTest(int numTask, int numPartition) { -// this.numTask = numTask; -// this.numPartition = numPartition; -// } + public GeodeAsSinkDUnitTest(int numTask, int numPartition) { + this.numTask = numTask; + this.numPartition = numPartition; + } @Test public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception { @@ -161,5 +157,5 @@ public class GeodeAsSinkDUnitTest { kafkaLocalCluster.stop(); } - } + } } diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java index 27f9391..abe0d6a 100644 --- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java +++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java @@ -148,7 +148,6 @@ public class GeodeKafkaTestUtils { await().atMost(10, TimeUnit.SECONDS).until(() -> { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : records) { - System.out.println("NABA :: " + record); valueReceived.incrementAndGet(); } return valueReceived.get() == numEvents; diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java index f0a0a1d..f24367c 100644 --- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java +++ b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java @@ -14,7 +14,6 @@ */ package org.geode.kafka; -import java.io.File; import java.io.IOException; import java.util.Properties; @@ -30,7 +29,7 @@ public class LocatorLauncherWrapper { properties.setProperty(ConfigurationProperties.NAME, "locator1"); Locator.startLocatorAndDS(10334, - null/*new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log")*/, properties); + null/* new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log") */, properties); while (true) { } diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java index 026012c..4ab75cd 100644 --- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java +++ b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java @@ -48,8 +48,8 @@ public class ServerLauncherWrapper { .set(ConfigurationProperties.LOCATORS, locatorString) .set(ConfigurationProperties.NAME, "server-1") -// .set(ConfigurationProperties.LOG_FILE, -// "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") + // .set(ConfigurationProperties.LOG_FILE, + // "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") .set(ConfigurationProperties.LOG_LEVEL, "info") // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile) .create(); diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java index 0d8ad40..a3efc23 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java @@ -18,7 +18,6 @@ import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BIND import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -43,19 +42,18 @@ public class WorkerAndHerderWrapper { String offsetPath = "/tmp/connect.offsets"; String regionToTopicBinding = GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS; String topicToRegionBinding = GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS; - String testTopicForSink = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; + String sinkTopic = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; String locatorString = null; System.out.println("MaxTask " + maxTasks); if (args.length == 7) { String sourceRegion = args[1]; String sinkRegion = args[2]; String sourceTopic = args[3]; - String sinkTopic = args[4]; + sinkTopic = args[4]; offsetPath = args[5]; regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]"; topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]"; locatorString = args[6]; - System.out.println("NABA args = " + Arrays.deepToString(args)); } Map props = new HashMap(); @@ -104,8 +102,7 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks); sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding); sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString); - System.out.println("NABA : binding " + topicToRegionBinding); - sinkProps.put("topics", testTopicForSink); + sinkProps.put("topics", sinkTopic); herder.putConnectorConfig( sinkProps.get(ConnectorConfig.NAME_CONFIG),