This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/restructuring
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 035d794580148f71f30dd5e0eac640bfa7992308
Author: Naburun Nag <n...@cs.wisc.edu>
AuthorDate: Mon Feb 10 16:32:16 2020 -0800

    Adding geode-dunit to run the tests.
---
 build.gradle                                       |   2 +-
 .../geode/kafka/GeodeKafkaConnectorTestBase.java   | 171 ++++++++++++++++++++
 .../java/org/geode/kafka/GeodeKafkaTestUtils.java  | 176 +++++++++++++++++++++
 3 files changed, 348 insertions(+), 1 deletion(-)

diff --git a/build.gradle b/build.gradle
index 2b7f853..33f9b9c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -75,7 +75,7 @@ dependencies {
     testCompile(group: 'junit', name: 'junit', version: '4.12')
     testCompile('org.mockito:mockito-core:3.2.4')
     testCompile('pl.pragmatists:JUnitParams:1.1.1')
-
+    testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: 
'1.11.0')
     testImplementation 'org.awaitility:awaitility:4.0.2'
 }
 
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java 
b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java
new file mode 100644
index 0000000..43723a9
--- /dev/null
+++ b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.geode.kafka;
+
+import static org.awaitility.Awaitility.await;
+import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
+import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.putDataIntoGeodeCluster;
+import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeClientAndRegion;
+import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeLocator;
+import static 
org.geode.kafka.GeodeKafkaTestUtils.startGeodeServerAndCreateSourceRegion;
+import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
+import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.version.VersionManager;
+
+public class GeodeKafkaConnectorTestBase {
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private static boolean debug = true;
+
+  public static String TEST_REGION_TO_TOPIC_BINDINGS = 
"[someRegionForSource:someTopicForSource]";
+  public static String TEST_TOPIC_TO_REGION_BINDINGS = 
"[someTopicForSink:someRegionForSink]";
+
+  public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
+  public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
+  public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
+  public static String TEST_REGION_FOR_SINK = "someRegionForSink";
+
+  private static ZooKeeperLocalCluster zooKeeperLocalCluster;
+  private static KafkaLocalCluster kafkaLocalCluster;
+  private static GeodeLocalCluster geodeLocalCluster;
+  private static WorkerAndHerderCluster workerAndHerderCluster;
+  private static Consumer<String, String> consumer;
+
+  private static Properties getZooKeeperProperties() throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty("dataDir",
+        (debug) ? "/tmp/zookeeper" : 
temporaryFolder.newFolder("zookeeper").getAbsolutePath());
+    properties.setProperty("clientPort", "2181");
+    properties.setProperty("tickTime", "2000");
+    return properties;
+  }
+
+  private static Properties getKafkaConfig() throws IOException {
+    int BROKER_PORT = 9092;
+    Properties props = new Properties();
+
+    props.put("broker.id", "0");
+    props.put("zookeeper.connect", "localhost:2181");
+    props.put("host.name", "localHost");
+    props.put("port", BROKER_PORT);
+    props.put("offsets.topic.replication.factor", "1");
+
+    // Specifically GeodeKafka connector configs
+    return props;
+  }
+
+
+  // @Before
+  // public void setup()
+  // throws IOException, QuorumPeerConfig.ConfigException, 
InterruptedException {
+  //
+  // System.out.println("NABA : kafka started");
+  // }
+
+  @After
+  public void cleanUp() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+        false,
+        200000,
+        15000,
+        10,
+        Time.SYSTEM,
+        "myGroup",
+        "myMetricType",
+        null);
+
+    zkClient.close();
+    kafkaLocalCluster.stop();
+  }
+
+  @Test
+  public void doNothing() {
+
+  }
+
+  @Test
+  public void endToEndSourceTest2() throws Exception {
+    startZooKeeper(getZooKeeperProperties());
+    startKafka(getKafkaConfig());
+    Host host = Host.getHost(0);
+    VM server = host.getVM(VersionManager.CURRENT_VERSION, 0);
+    VM locator = host.getVM(VersionManager.CURRENT_VERSION, 1);
+    VM client = host.getVM(VersionManager.CURRENT_VERSION, 2);
+    WorkerAndHerderCluster workerAndHerderCluster = null;
+    // Start the Apache Geode locator and server and create the source region
+    startGeodeLocator(locator);
+    // Topic Name and all region names must be the same
+    String topicName = testName.getMethodName();
+    startGeodeServerAndCreateSourceRegion(server, topicName);
+    startGeodeClientAndRegion(client, topicName);
+
+    try {
+      createTopic(topicName, 1, 1);
+      // Create workers and herder cluster
+      workerAndHerderCluster = startWorkerAndHerderCluster(1);
+
+      consumer = createConsumer(topicName);
+
+      // Insert data into the Apache Geode source
+      putDataIntoGeodeCluster(client, topicName, 10);
+      // Assert that all the data inserted in Apache Geode source is received 
by the consumer
+      AtomicInteger valueReceived = new AtomicInteger(0);
+      await().atMost(10, TimeUnit.SECONDS).until(() -> {
+        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(2));
+        for (ConsumerRecord<String, String> record : records) {
+          valueReceived.incrementAndGet();
+        }
+        return valueReceived.get() == 10;
+      });
+    } finally {
+      deleteTopic(topicName);
+      if (workerAndHerderCluster != null) {
+        workerAndHerderCluster.stop();
+      }
+    }
+  }
+
+}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java 
b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
new file mode 100644
index 0000000..a979e3b
--- /dev/null
+++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.geode.kafka;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.test.dunit.VM;
+
+public class GeodeKafkaTestUtils {
+  protected static ZooKeeperLocalCluster startZooKeeper(Properties 
zookeeperProperties)
+      throws IOException, QuorumPeerConfig.ConfigException {
+    ZooKeeperLocalCluster zooKeeperLocalCluster = new 
ZooKeeperLocalCluster(zookeeperProperties);
+    zooKeeperLocalCluster.start();
+    return zooKeeperLocalCluster;
+  }
+
+  protected static KafkaLocalCluster startKafka(Properties kafkaProperties)
+      throws IOException, InterruptedException, 
QuorumPeerConfig.ConfigException {
+    KafkaLocalCluster kafkaLocalCluster = new 
KafkaLocalCluster(kafkaProperties);
+    kafkaLocalCluster.start();
+    return kafkaLocalCluster;
+  }
+
+  protected static void createTopic(String topicName, int numPartitions, int 
replicationFactor) {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 
200000,
+        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+
+    Properties topicProperties = new Properties();
+    topicProperties.put("flush.messages", "1");
+    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+    adminZkClient.createTopic(topicName, numPartitions, replicationFactor, 
topicProperties,
+        RackAwareMode.Disabled$.MODULE$);
+  }
+
+  protected static void deleteTopic(String topicName) {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 
200000,
+        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+    adminZkClient.deleteTopic(topicName);
+  }
+
+  // consumer props, less important, just for testing?
+  public static Consumer<String, String> createConsumer(String 
testTopicForSource) {
+    final Properties props = new Properties();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG,
+        "myGroup");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+    // Create the consumer using props.
+    final Consumer<String, String> consumer =
+        new KafkaConsumer<>(props);
+    // Subscribe to the topic.
+    consumer.subscribe(Collections.singletonList(testTopicForSource));
+    return consumer;
+  }
+
+  // consumer props, less important, just for testing?
+  public static Producer<String, String> createProducer() {
+    final Properties props = new Properties();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        StringSerializer.class.getName());
+
+    // Create the producer using props.
+    final Producer<String, String> producer =
+        new KafkaProducer<>(props);
+    return producer;
+
+  }
+
+  public static void startGeodeLocator(VM locatorVM) {
+    locatorVM.invoke(() -> {
+      Properties properties = new Properties();
+      properties.setProperty(ConfigurationProperties.NAME, "locator1");
+      Locator.startLocatorAndDS(10334,
+          null, properties);
+    });
+  }
+
+  public static void startGeodeServerAndCreateSourceRegion(VM serverVM, String 
regionName) {
+    serverVM.invoke(() -> {
+      Properties properties = new Properties();
+      Cache cache = new CacheFactory(properties)
+          .set(ConfigurationProperties.LOCATORS, "localhost[10334]")
+          .set(ConfigurationProperties.NAME, "server-1")
+          .create();
+      CacheServer cacheServer = cache.addCacheServer();
+      cacheServer.setPort(0);
+      cacheServer.start();
+
+      cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+    });
+  }
+
+  protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int 
maxTasks) {
+    WorkerAndHerderCluster workerAndHerderCluster = new 
WorkerAndHerderCluster();
+    try {
+      workerAndHerderCluster.start(String.valueOf(maxTasks));
+      Thread.sleep(20000);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not start the worker and herder 
cluster" + e);
+    }
+    return workerAndHerderCluster;
+  }
+
+  protected static void startGeodeClientAndRegion(VM client, String 
regionName) {
+    client.invoke(() -> {
+      ClientCache clientCache = new ClientCacheFactory()
+          .addPoolLocator("localhost", 10334)
+          .create();
+
+      
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+    });
+  }
+
+  protected static void putDataIntoGeodeCluster(VM client, String regionName, 
int num) {
+    client.invoke(() -> {
+      ClientCache clientCache = new ClientCacheFactory()
+          .addPoolLocator("localhost", 10334)
+          .create();
+      Region region = clientCache.getRegion(regionName);
+      for (int i = 0; i < num; i++) {
+        region.put("KEY" + i, "VALUE" + i);
+      }
+    });
+
+  }
+
+
+}

Reply via email to