IGNITE-7752: Update Ignite KafkaStreamer to use new consumer. - Fixes #4363.

Signed-off-by: shroman <rsht...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44391198
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44391198
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44391198

Branch: refs/heads/ignite-8446
Commit: 443911988d246a2cc62abc450758a3702552046a
Parents: ff89193
Author: shroman <rsht...@yahoo.com>
Authored: Wed Aug 1 16:53:28 2018 +0900
Committer: shroman <rsht...@yahoo.com>
Committed: Wed Aug 1 16:53:28 2018 +0900

----------------------------------------------------------------------
 modules/kafka/pom.xml                           |  12 +-
 .../ignite/stream/kafka/KafkaStreamer.java      | 173 ++++++++++---------
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  39 ++---
 .../ignite/stream/kafka/TestKafkaBroker.java    |  18 +-
 4 files changed, 121 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index f015d21..18ffcaa 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -20,7 +20,8 @@
 <!--
     POM file.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -43,7 +44,7 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka-clients</artifactId>
             <version>${kafka.version}</version>
         </dependency>
 
@@ -55,6 +56,13 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
             <artifactId>connect-runtime</artifactId>
             <version>${kafka.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java 
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index ed3530b..bdbd916 100644
--- 
a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ 
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -17,68 +17,63 @@
 
 package org.apache.ignite.stream.kafka;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.stream.StreamAdapter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
 
 /**
  * Server that subscribes to topic messages from Kafka broker and streams its 
to key-value pairs into
  * {@link IgniteDataStreamer} instance.
  * <p>
  * Uses Kafka's High Level Consumer API to read messages from Kafka.
- *
- * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example";>Consumer
 Consumer Group
- * Example</a>
  */
-public class KafkaStreamer<K, V> extends 
StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V> {
-    /** Retry timeout. */
-    private static final long DFLT_RETRY_TIMEOUT = 10000;
+public class KafkaStreamer<K, V> extends StreamAdapter<ConsumerRecord, K, V> {
+    /** Default polling timeout. */
+    private final static long DFLT_TIMEOUT = 100;
 
     /** Logger. */
     private IgniteLogger log;
 
-    /** Executor used to submit kafka streams. */
+    /** Polling tasks executor. */
     private ExecutorService executor;
 
-    /** Topic. */
-    private String topic;
+    /** Topics. */
+    private List<String> topics;
 
-    /** Number of threads to process kafka streams. */
+    /** Number of threads. */
     private int threads;
 
     /** Kafka consumer config. */
-    private ConsumerConfig consumerCfg;
-
-    /** Kafka consumer connector. */
-    private ConsumerConnector consumer;
+    private Properties consumerCfg;
 
-    /** Retry timeout. */
-    private long retryTimeout = DFLT_RETRY_TIMEOUT;
+    /** Polling timeout. */
+    private long timeout = DFLT_TIMEOUT;
 
-    /** Stopped. */
-    private volatile boolean stopped;
+    /** Kafka consumer tasks. */
+    private final List<ConsumerTask> consumerTasks = new ArrayList<>();
 
     /**
-     * Sets the topic name.
+     * Sets the topic names.
      *
-     * @param topic Topic name.
+     * @param topics Topic names.
      */
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public void setTopic(List<String> topics) {
+        this.topics = topics;
     }
 
     /**
@@ -95,19 +90,19 @@ public class KafkaStreamer<K, V> extends 
StreamAdapter<MessageAndMetadata<byte[]
      *
      * @param consumerCfg Consumer configuration.
      */
-    public void setConsumerConfig(ConsumerConfig consumerCfg) {
+    public void setConsumerConfig(Properties consumerCfg) {
         this.consumerCfg = consumerCfg;
     }
 
     /**
-     * Sets the retry timeout.
+     * Sets the polling timeout for Kafka tasks.
      *
-     * @param retryTimeout Retry timeout.
+     * @param timeout Timeout.
      */
-    public void setRetryTimeout(long retryTimeout) {
-        A.ensure(retryTimeout > 0, "retryTimeout > 0");
+    public void setTimeout(long timeout) {
+        A.ensure(timeout > 0, "timeout > 0");
 
-        this.retryTimeout = retryTimeout;
+        this.timeout = timeout;
     }
 
     /**
@@ -118,7 +113,7 @@ public class KafkaStreamer<K, V> extends 
StreamAdapter<MessageAndMetadata<byte[]
     public void start() {
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");
-        A.notNull(topic, "topic");
+        A.notNull(topics, "topics");
         A.notNull(consumerCfg, "kafka consumer config");
         A.ensure(threads > 0, "threads > 0");
         A.ensure(null != getSingleTupleExtractor() || null != 
getMultipleTupleExtractor(),
@@ -126,65 +121,20 @@ public class KafkaStreamer<K, V> extends 
StreamAdapter<MessageAndMetadata<byte[]
 
         log = getIgnite().log();
 
-        consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg);
-
-        Map<String, Integer> topicCntMap = new HashMap<>();
-
-        topicCntMap.put(topic, threads);
-
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCntMap);
-
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-
-        // Now launch all the consumer threads.
         executor = Executors.newFixedThreadPool(threads);
 
-        stopped = false;
+        IntStream.range(0, threads).forEach(i -> consumerTasks.add(new 
ConsumerTask(consumerCfg)));
 
-        // Now create an object to consume the messages.
-        for (final KafkaStream<byte[], byte[]> stream : streams) {
-            executor.execute(new Runnable() {
-                @Override public void run() {
-                    while (!stopped) {
-                        try {
-                            MessageAndMetadata<byte[], byte[]> msg;
-
-                            for (ConsumerIterator<byte[], byte[]> it = 
stream.iterator(); it.hasNext() && !stopped; ) {
-                                msg = it.next();
-
-                                try {
-                                    addMessage(msg);
-                                }
-                                catch (Exception e) {
-                                    U.error(log, "Message is ignored due to an 
error [msg=" + msg + ']', e);
-                                }
-                            }
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Message can't be consumed from 
stream. Retry after " +
-                                retryTimeout + " ms.", e);
-
-                            try {
-                                Thread.sleep(retryTimeout);
-                            }
-                            catch (InterruptedException ignored) {
-                                // No-op.
-                            }
-                        }
-                    }
-                }
-            });
-        }
+        for (ConsumerTask task : consumerTasks)
+            executor.submit(task);
     }
 
     /**
      * Stops streamer.
      */
     public void stop() {
-        stopped = true;
-
-        if (consumer != null)
-            consumer.shutdown();
+        for (ConsumerTask task : consumerTasks)
+            task.stop();
 
         if (executor != null) {
             executor.shutdown();
@@ -200,4 +150,55 @@ public class KafkaStreamer<K, V> extends 
StreamAdapter<MessageAndMetadata<byte[]
             }
         }
     }
+
+    /** Polling task. */
+    class ConsumerTask implements Callable<Void> {
+        /** Kafka consumer. */
+        private final KafkaConsumer<?, ?> consumer;
+
+        /** Stopped. */
+        private volatile boolean stopped;
+
+        /** Constructor. */
+        public ConsumerTask(Properties consumerCfg) {
+            this.consumer = new KafkaConsumer<>(consumerCfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            consumer.subscribe(topics);
+
+            try {
+                while (!stopped) {
+                    for (ConsumerRecord record : consumer.poll(timeout)) {
+                        try {
+                            addMessage(record);
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Record is ignored due to an error 
[record = " + record + ']', e);
+                        }
+                    }
+                }
+            }
+            catch (WakeupException we) {
+                log.info("Consumer is being stopped.");
+            }
+            catch (KafkaException ke) {
+                log.error("Kafka error", ke);
+            }
+            finally {
+                consumer.close();
+            }
+
+            return null;
+        }
+
+        /** Stops the polling task. */
+        public void stop() {
+            stopped = true;
+
+            if (consumer != null)
+                consumer.wakeup();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 00cb4fc..48d4a8d 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.stream.kafka;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -27,17 +28,16 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import kafka.consumer.ConsumerConfig;
-import kafka.message.MessageAndMetadata;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 
@@ -167,23 +167,22 @@ public class KafkaIgniteStreamerSelfTest extends 
GridCommonAbstractTest {
             kafkaStmr.setStreamer(stmr);
 
             // Set the topic.
-            kafkaStmr.setTopic(topic);
+            kafkaStmr.setTopic(Arrays.asList(topic));
 
             // Set the number of threads.
             kafkaStmr.setThreads(4);
 
             // Set the consumer configuration.
             kafkaStmr.setConsumerConfig(
-                
createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
+                createDefaultConsumerConfig(embeddedBroker.getBrokerAddress(), 
"groupX"));
 
             kafkaStmr.setMultipleTupleExtractor(
-                new StreamMultipleTupleExtractor<MessageAndMetadata<byte[], 
byte[]>, String, String>() {
-                @Override public Map<String, String> 
extract(MessageAndMetadata<byte[], byte[]> msg) {
+                record -> {
                     Map<String, String> entries = new HashMap<>();
 
                     try {
-                        String key = new String(msg.key());
-                        String val = new String(msg.message());
+                        String key = (String)record.key();
+                        String val = (String)record.value();
 
                         // Convert the message into number of cache entries 
with same key or dynamic key from actual message.
                         // For now using key as cache entry key and value as 
cache entry value - for test purpose.
@@ -194,8 +193,7 @@ public class KafkaIgniteStreamerSelfTest extends 
GridCommonAbstractTest {
                     }
 
                     return entries;
-                }
-            });
+                });
 
             // Start kafka streamer.
             kafkaStmr.start();
@@ -227,23 +225,22 @@ public class KafkaIgniteStreamerSelfTest extends 
GridCommonAbstractTest {
     /**
      * Creates default consumer config.
      *
-     * @param zooKeeper ZooKeeper address &lt;server:port&gt;.
+     * @param servers Bootstrap servers' address in the form of 
&lt;server:port;server:port&gt;.
      * @param grpId Group Id for kafka subscriber.
      * @return Kafka consumer configuration.
      */
-    private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, 
String grpId) {
-        A.notNull(zooKeeper, "zookeeper");
+    private Properties createDefaultConsumerConfig(String servers, String 
grpId) {
+        A.notNull(servers, "bootstrap servers");
         A.notNull(grpId, "groupId");
 
         Properties props = new Properties();
 
-        props.put("zookeeper.connect", zooKeeper);
-        props.put("group.id", grpId);
-        props.put("zookeeper.session.timeout.ms", "400");
-        props.put("zookeeper.sync.time.ms", "200");
-        props.put("auto.commit.interval.ms", "1000");
-        props.put("auto.offset.reset", "smallest");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, grpId);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
 
-        return new ConsumerConfig(props);
+        return props;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 803d55e..4f0d1d3 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -36,7 +36,9 @@ import org.apache.curator.test.TestingServer;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import scala.Tuple2;
 
 /**
@@ -202,15 +204,6 @@ public class TestKafkaBroker {
     }
 
     /**
-     * Obtains Zookeeper address.
-     *
-     * @return Zookeeper address.
-     */
-    public String getZookeeperAddress() {
-        return BROKER_HOST + ":" + ZK_PORT;
-    }
-
-    /**
      * Obtains producer config.
      *
      * @return Kafka Producer config.
@@ -218,9 +211,10 @@ public class TestKafkaBroker {
     private Properties getProducerConfig() {
         Properties props = new Properties();
 
-        props.put("bootstrap.servers", getBrokerAddress());
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress());
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 
         return props;
     }

Reply via email to