This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/restructuring in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 035d794580148f71f30dd5e0eac640bfa7992308 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Mon Feb 10 16:32:16 2020 -0800 Adding geode-dunit to run the tests. --- build.gradle | 2 +- .../geode/kafka/GeodeKafkaConnectorTestBase.java | 171 ++++++++++++++++++++ .../java/org/geode/kafka/GeodeKafkaTestUtils.java | 176 +++++++++++++++++++++ 3 files changed, 348 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 2b7f853..33f9b9c 100644 --- a/build.gradle +++ b/build.gradle @@ -75,7 +75,7 @@ dependencies { testCompile(group: 'junit', name: 'junit', version: '4.12') testCompile('org.mockito:mockito-core:3.2.4') testCompile('pl.pragmatists:JUnitParams:1.1.1') - + testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: '1.11.0') testImplementation 'org.awaitility:awaitility:4.0.2' } diff --git a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java new file mode 100644 index 0000000..43723a9 --- /dev/null +++ b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.geode.kafka; + +import static org.awaitility.Awaitility.await; +import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; +import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.putDataIntoGeodeCluster; +import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeClientAndRegion; +import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeLocator; +import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeServerAndCreateSourceRegion; +import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; +import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; +import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.utils.Time; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.version.VersionManager; + +public class GeodeKafkaConnectorTestBase { + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(); + + @Rule + public TestName testName = new TestName(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static boolean debug = true; + + public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]"; + public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]"; + + public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource"; + public static String TEST_REGION_FOR_SOURCE = "someRegionForSource"; + public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; + public static String TEST_REGION_FOR_SINK = "someRegionForSink"; + + private static ZooKeeperLocalCluster zooKeeperLocalCluster; + private static KafkaLocalCluster kafkaLocalCluster; + private static GeodeLocalCluster geodeLocalCluster; + private static WorkerAndHerderCluster workerAndHerderCluster; + private static Consumer<String, String> consumer; + + private static Properties getZooKeeperProperties() throws IOException { + Properties properties = new Properties(); + properties.setProperty("dataDir", + (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath()); + properties.setProperty("clientPort", "2181"); + properties.setProperty("tickTime", "2000"); + return properties; + } + + private static Properties getKafkaConfig() throws IOException { + int BROKER_PORT = 9092; + Properties props = new Properties(); + + props.put("broker.id", "0"); + props.put("zookeeper.connect", "localhost:2181"); + props.put("host.name", "localHost"); + props.put("port", BROKER_PORT); + props.put("offsets.topic.replication.factor", "1"); + + // Specifically GeodeKafka connector configs + return props; + } + + + // @Before + // public void setup() + // throws IOException, QuorumPeerConfig.ConfigException, InterruptedException { + // + // System.out.println("NABA : kafka started"); + // } + + @After + public void cleanUp() { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", + false, + 200000, + 15000, + 10, + Time.SYSTEM, + "myGroup", + "myMetricType", + null); + + zkClient.close(); + kafkaLocalCluster.stop(); + } + + @Test + public void doNothing() { + + } + + @Test + public void endToEndSourceTest2() throws Exception { + startZooKeeper(getZooKeeperProperties()); + startKafka(getKafkaConfig()); + Host host = Host.getHost(0); + VM server = host.getVM(VersionManager.CURRENT_VERSION, 0); + VM locator = host.getVM(VersionManager.CURRENT_VERSION, 1); + VM client = host.getVM(VersionManager.CURRENT_VERSION, 2); + WorkerAndHerderCluster workerAndHerderCluster = null; + // Start the Apache Geode locator and server and create the source region + startGeodeLocator(locator); + // Topic Name and all region names must be the same + String topicName = testName.getMethodName(); + startGeodeServerAndCreateSourceRegion(server, topicName); + startGeodeClientAndRegion(client, topicName); + + try { + createTopic(topicName, 1, 1); + // Create workers and herder cluster + workerAndHerderCluster = startWorkerAndHerderCluster(1); + + consumer = createConsumer(topicName); + + // Insert data into the Apache Geode source + putDataIntoGeodeCluster(client, topicName, 10); + // Assert that all the data inserted in Apache Geode source is received by the consumer + AtomicInteger valueReceived = new AtomicInteger(0); + await().atMost(10, TimeUnit.SECONDS).until(() -> { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); + for (ConsumerRecord<String, String> record : records) { + valueReceived.incrementAndGet(); + } + return valueReceived.get() == 10; + }); + } finally { + deleteTopic(topicName); + if (workerAndHerderCluster != null) { + workerAndHerderCluster.stop(); + } + } + } + +} diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java new file mode 100644 index 0000000..a979e3b --- /dev/null +++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.geode.kafka; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; + +import kafka.admin.RackAwareMode; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.Locator; +import org.apache.geode.test.dunit.VM; + +public class GeodeKafkaTestUtils { + protected static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties) + throws IOException, QuorumPeerConfig.ConfigException { + ZooKeeperLocalCluster zooKeeperLocalCluster = new ZooKeeperLocalCluster(zookeeperProperties); + zooKeeperLocalCluster.start(); + return zooKeeperLocalCluster; + } + + protected static KafkaLocalCluster startKafka(Properties kafkaProperties) + throws IOException, InterruptedException, QuorumPeerConfig.ConfigException { + KafkaLocalCluster kafkaLocalCluster = new KafkaLocalCluster(kafkaProperties); + kafkaLocalCluster.start(); + return kafkaLocalCluster; + } + + protected static void createTopic(String topicName, int numPartitions, int replicationFactor) { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, + 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); + + Properties topicProperties = new Properties(); + topicProperties.put("flush.messages", "1"); + AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties, + RackAwareMode.Disabled$.MODULE$); + } + + protected static void deleteTopic(String topicName) { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, + 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); + AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.deleteTopic(topicName); + } + + // consumer props, less important, just for testing? + public static Consumer<String, String> createConsumer(String testTopicForSource) { + final Properties props = new Properties(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + "myGroup"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Create the consumer using props. + final Consumer<String, String> consumer = + new KafkaConsumer<>(props); + // Subscribe to the topic. + consumer.subscribe(Collections.singletonList(testTopicForSource)); + return consumer; + } + + // consumer props, less important, just for testing? + public static Producer<String, String> createProducer() { + final Properties props = new Properties(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + + // Create the producer using props. + final Producer<String, String> producer = + new KafkaProducer<>(props); + return producer; + + } + + public static void startGeodeLocator(VM locatorVM) { + locatorVM.invoke(() -> { + Properties properties = new Properties(); + properties.setProperty(ConfigurationProperties.NAME, "locator1"); + Locator.startLocatorAndDS(10334, + null, properties); + }); + } + + public static void startGeodeServerAndCreateSourceRegion(VM serverVM, String regionName) { + serverVM.invoke(() -> { + Properties properties = new Properties(); + Cache cache = new CacheFactory(properties) + .set(ConfigurationProperties.LOCATORS, "localhost[10334]") + .set(ConfigurationProperties.NAME, "server-1") + .create(); + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + + cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName); + }); + } + + protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks) { + WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster(); + try { + workerAndHerderCluster.start(String.valueOf(maxTasks)); + Thread.sleep(20000); + } catch (Exception e) { + throw new RuntimeException("Could not start the worker and herder cluster" + e); + } + return workerAndHerderCluster; + } + + protected static void startGeodeClientAndRegion(VM client, String regionName) { + client.invoke(() -> { + ClientCache clientCache = new ClientCacheFactory() + .addPoolLocator("localhost", 10334) + .create(); + + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + }); + } + + protected static void putDataIntoGeodeCluster(VM client, String regionName, int num) { + client.invoke(() -> { + ClientCache clientCache = new ClientCacheFactory() + .addPoolLocator("localhost", 10334) + .create(); + Region region = clientCache.getRegion(regionName); + for (int i = 0; i < num; i++) { + region.put("KEY" + i, "VALUE" + i); + } + }); + + } + + +}