IGNITE-7752: Update Ignite KafkaStreamer to use new consumer. - Fixes #4363.
Signed-off-by: shroman <rsht...@yahoo.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44391198 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44391198 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44391198 Branch: refs/heads/ignite-8446 Commit: 443911988d246a2cc62abc450758a3702552046a Parents: ff89193 Author: shroman <rsht...@yahoo.com> Authored: Wed Aug 1 16:53:28 2018 +0900 Committer: shroman <rsht...@yahoo.com> Committed: Wed Aug 1 16:53:28 2018 +0900 ---------------------------------------------------------------------- modules/kafka/pom.xml | 12 +- .../ignite/stream/kafka/KafkaStreamer.java | 173 ++++++++++--------- .../kafka/KafkaIgniteStreamerSelfTest.java | 39 ++--- .../ignite/stream/kafka/TestKafkaBroker.java | 18 +- 4 files changed, 121 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml index f015d21..18ffcaa 100644 --- a/modules/kafka/pom.xml +++ b/modules/kafka/pom.xml @@ -20,7 +20,8 @@ <!-- POM file. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -43,7 +44,7 @@ <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> @@ -55,6 +56,13 @@ <dependency> <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> <artifactId>connect-runtime</artifactId> <version>${kafka.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java index ed3530b..bdbd916 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java @@ -17,68 +17,63 @@ package org.apache.ignite.stream.kafka; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; +import java.util.stream.IntStream; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.stream.StreamAdapter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.WakeupException; /** * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into * {@link IgniteDataStreamer} instance. * <p> * Uses Kafka's High Level Consumer API to read messages from Kafka. - * - * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group - * Example</a> */ -public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V> { - /** Retry timeout. */ - private static final long DFLT_RETRY_TIMEOUT = 10000; +public class KafkaStreamer<K, V> extends StreamAdapter<ConsumerRecord, K, V> { + /** Default polling timeout. */ + private final static long DFLT_TIMEOUT = 100; /** Logger. */ private IgniteLogger log; - /** Executor used to submit kafka streams. */ + /** Polling tasks executor. */ private ExecutorService executor; - /** Topic. */ - private String topic; + /** Topics. */ + private List<String> topics; - /** Number of threads to process kafka streams. */ + /** Number of threads. */ private int threads; /** Kafka consumer config. */ - private ConsumerConfig consumerCfg; - - /** Kafka consumer connector. */ - private ConsumerConnector consumer; + private Properties consumerCfg; - /** Retry timeout. */ - private long retryTimeout = DFLT_RETRY_TIMEOUT; + /** Polling timeout. */ + private long timeout = DFLT_TIMEOUT; - /** Stopped. */ - private volatile boolean stopped; + /** Kafka consumer tasks. */ + private final List<ConsumerTask> consumerTasks = new ArrayList<>(); /** - * Sets the topic name. + * Sets the topic names. * - * @param topic Topic name. + * @param topics Topic names. */ - public void setTopic(String topic) { - this.topic = topic; + public void setTopic(List<String> topics) { + this.topics = topics; } /** @@ -95,19 +90,19 @@ public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[] * * @param consumerCfg Consumer configuration. */ - public void setConsumerConfig(ConsumerConfig consumerCfg) { + public void setConsumerConfig(Properties consumerCfg) { this.consumerCfg = consumerCfg; } /** - * Sets the retry timeout. + * Sets the polling timeout for Kafka tasks. * - * @param retryTimeout Retry timeout. + * @param timeout Timeout. */ - public void setRetryTimeout(long retryTimeout) { - A.ensure(retryTimeout > 0, "retryTimeout > 0"); + public void setTimeout(long timeout) { + A.ensure(timeout > 0, "timeout > 0"); - this.retryTimeout = retryTimeout; + this.timeout = timeout; } /** @@ -118,7 +113,7 @@ public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[] public void start() { A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); - A.notNull(topic, "topic"); + A.notNull(topics, "topics"); A.notNull(consumerCfg, "kafka consumer config"); A.ensure(threads > 0, "threads > 0"); A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(), @@ -126,65 +121,20 @@ public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[] log = getIgnite().log(); - consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg); - - Map<String, Integer> topicCntMap = new HashMap<>(); - - topicCntMap.put(topic, threads); - - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCntMap); - - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - - // Now launch all the consumer threads. executor = Executors.newFixedThreadPool(threads); - stopped = false; + IntStream.range(0, threads).forEach(i -> consumerTasks.add(new ConsumerTask(consumerCfg))); - // Now create an object to consume the messages. - for (final KafkaStream<byte[], byte[]> stream : streams) { - executor.execute(new Runnable() { - @Override public void run() { - while (!stopped) { - try { - MessageAndMetadata<byte[], byte[]> msg; - - for (ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.hasNext() && !stopped; ) { - msg = it.next(); - - try { - addMessage(msg); - } - catch (Exception e) { - U.error(log, "Message is ignored due to an error [msg=" + msg + ']', e); - } - } - } - catch (Exception e) { - U.error(log, "Message can't be consumed from stream. Retry after " + - retryTimeout + " ms.", e); - - try { - Thread.sleep(retryTimeout); - } - catch (InterruptedException ignored) { - // No-op. - } - } - } - } - }); - } + for (ConsumerTask task : consumerTasks) + executor.submit(task); } /** * Stops streamer. */ public void stop() { - stopped = true; - - if (consumer != null) - consumer.shutdown(); + for (ConsumerTask task : consumerTasks) + task.stop(); if (executor != null) { executor.shutdown(); @@ -200,4 +150,55 @@ public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[] } } } + + /** Polling task. */ + class ConsumerTask implements Callable<Void> { + /** Kafka consumer. */ + private final KafkaConsumer<?, ?> consumer; + + /** Stopped. */ + private volatile boolean stopped; + + /** Constructor. */ + public ConsumerTask(Properties consumerCfg) { + this.consumer = new KafkaConsumer<>(consumerCfg); + } + + /** {@inheritDoc} */ + @Override public Void call() { + consumer.subscribe(topics); + + try { + while (!stopped) { + for (ConsumerRecord record : consumer.poll(timeout)) { + try { + addMessage(record); + } + catch (Exception e) { + U.error(log, "Record is ignored due to an error [record = " + record + ']', e); + } + } + } + } + catch (WakeupException we) { + log.info("Consumer is being stopped."); + } + catch (KafkaException ke) { + log.error("Kafka error", ke); + } + finally { + consumer.close(); + } + + return null; + } + + /** Stops the polling task. */ + public void stop() { + stopped = true; + + if (consumer != null) + consumer.wakeup(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java index 00cb4fc..48d4a8d 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.stream.kafka; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -27,17 +28,16 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import kafka.consumer.ConsumerConfig; -import kafka.message.MessageAndMetadata; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; @@ -167,23 +167,22 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { kafkaStmr.setStreamer(stmr); // Set the topic. - kafkaStmr.setTopic(topic); + kafkaStmr.setTopic(Arrays.asList(topic)); // Set the number of threads. kafkaStmr.setThreads(4); // Set the consumer configuration. kafkaStmr.setConsumerConfig( - createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX")); + createDefaultConsumerConfig(embeddedBroker.getBrokerAddress(), "groupX")); kafkaStmr.setMultipleTupleExtractor( - new StreamMultipleTupleExtractor<MessageAndMetadata<byte[], byte[]>, String, String>() { - @Override public Map<String, String> extract(MessageAndMetadata<byte[], byte[]> msg) { + record -> { Map<String, String> entries = new HashMap<>(); try { - String key = new String(msg.key()); - String val = new String(msg.message()); + String key = (String)record.key(); + String val = (String)record.value(); // Convert the message into number of cache entries with same key or dynamic key from actual message. // For now using key as cache entry key and value as cache entry value - for test purpose. @@ -194,8 +193,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { } return entries; - } - }); + }); // Start kafka streamer. kafkaStmr.start(); @@ -227,23 +225,22 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { /** * Creates default consumer config. * - * @param zooKeeper ZooKeeper address <server:port>. + * @param servers Bootstrap servers' address in the form of <server:port;server:port>. * @param grpId Group Id for kafka subscriber. * @return Kafka consumer configuration. */ - private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) { - A.notNull(zooKeeper, "zookeeper"); + private Properties createDefaultConsumerConfig(String servers, String grpId) { + A.notNull(servers, "bootstrap servers"); A.notNull(grpId, "groupId"); Properties props = new Properties(); - props.put("zookeeper.connect", zooKeeper); - props.put("group.id", grpId); - props.put("zookeeper.session.timeout.ms", "400"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - props.put("auto.offset.reset", "smallest"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, grpId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - return new ConsumerConfig(props); + return props; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/44391198/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java index 803d55e..4f0d1d3 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java @@ -36,7 +36,9 @@ import org.apache.curator.test.TestingServer; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import scala.Tuple2; /** @@ -202,15 +204,6 @@ public class TestKafkaBroker { } /** - * Obtains Zookeeper address. - * - * @return Zookeeper address. - */ - public String getZookeeperAddress() { - return BROKER_HOST + ":" + ZK_PORT; - } - - /** * Obtains producer config. * * @return Kafka Producer config. @@ -218,9 +211,10 @@ public class TestKafkaBroker { private Properties getProducerConfig() { Properties props = new Properties(); - props.put("bootstrap.servers", getBrokerAddress()); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return props; }