This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99fb872  In Kafka consumer wrapper, group multiple messages when the 
app polls (#1115)
99fb872 is described below

commit 99fb872f2f27991de5a974a9b8434d3d6aa9366b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jan 31 09:13:45 2018 -0800

    In Kafka consumer wrapper, group multiple messages when the app polls 
(#1115)
    
    * In Kafka consumer wrapper, group multiple messages when the app polls
    
    * Fixed unit tests
---
 .../client/kafka/compat/tests/KafkaApiTest.java    | 20 +++---
 .../clients/consumer/PulsarKafkaConsumer.java      | 68 +++++++++++---------
 .../kafka/compat/tests/KafkaConsumerTest.java      | 72 ++++++++++++----------
 3 files changed, 91 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
index 44357ea..957f3aa 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -91,17 +92,18 @@ public class KafkaApiTest extends BrokerTestBase {
         producer.flush();
         producer.close();
 
-        for (int i = 0; i < 10; i++) {
-            ConsumerRecords<Integer, String> records = consumer.poll(1000);
-            assertEquals(records.count(), 1);
-
-            int idx = i;
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
+            ConsumerRecords<Integer, String> records = consumer.poll(100);
             records.forEach(record -> {
-                log.info("Received record: {}", record);
-                assertEquals(record.key().intValue(), idx);
-                assertEquals(record.value(), "hello-" + idx);
-                assertEquals(record.offset(), offsets.get(idx).longValue());
+                assertEquals(record.key().intValue(), received.get());
+                assertEquals(record.value(), "hello-" + received.get());
+                assertEquals(record.offset(), 
offsets.get(received.get()).longValue());
+
+                received.incrementAndGet();
             });
+
+            consumer.commitSync();
         }
 
         consumer.close();
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 379bde4..d3dc6e4 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -61,8 +61,6 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
 
-import com.google.common.collect.Lists;
-
 public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, 
MessageListener {
 
     private static final long serialVersionUID = 1L;
@@ -146,6 +144,9 @@ public class PulsarKafkaConsumer<K, V> implements 
Consumer<K, V>, MessageListene
         Properties properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
         ClientConfiguration clientConf = 
PulsarKafkaConfig.getClientConfiguration(properties);
+        // Since this client instance is going to be used just for the 
consumers, we can enable Nagle to group
+        // all the acknowledgments sent to broker within a short time frame
+        clientConf.setUseTcpNoDelay(false);
         try {
             client = PulsarClient.create(serviceUrl, clientConf);
         } catch (PulsarClientException e) {
@@ -270,6 +271,8 @@ public class PulsarKafkaConsumer<K, V> implements 
Consumer<K, V>, MessageListene
         });
     }
 
+    private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;
+
     @SuppressWarnings("unchecked")
     @Override
     public ConsumerRecords<K, V> poll(long timeoutMillis) {
@@ -279,40 +282,49 @@ public class PulsarKafkaConsumer<K, V> implements 
Consumer<K, V>, MessageListene
                 return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
             }
 
-            if (isAutoCommit) {
-                // Commit the offset of previously dequeued messages
-                commitAsync();
-            }
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new 
HashMap<>();
 
-            DestinationName dn = DestinationName.get(item.consumer.getTopic());
-            String topic = dn.getPartitionedTopicName();
-            int partition = dn.isPartitioned() ? dn.getPartitionIndex() : 0;
-            Message msg = item.message;
-            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
-            long offset = MessageIdUtils.getOffset(msgId);
+            int numberOfRecords = 0;
 
-            TopicPartition tp = new TopicPartition(topic, partition);
+            while (item != null && ++numberOfRecords < 
MAX_RECORDS_IN_SINGLE_POLL) {
+                DestinationName dn = 
DestinationName.get(item.consumer.getTopic());
+                String topic = dn.getPartitionedTopicName();
+                int partition = dn.isPartitioned() ? dn.getPartitionIndex() : 
0;
+                Message msg = item.message;
+                MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+                long offset = MessageIdUtils.getOffset(msgId);
 
-            K key = getKey(topic, msg);
-            V value = valueDeserializer.deserialize(topic, msg.getData());
+                TopicPartition tp = new TopicPartition(topic, partition);
 
-            TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
-            long timestamp = msg.getPublishTime();
+                K key = getKey(topic, msg);
+                V value = valueDeserializer.deserialize(topic, msg.getData());
 
-            if (msg.getEventTime() > 0) {
-                // If we have Event time, use that in preference
-                timestamp = msg.getEventTime();
-                timestampType = TimestampType.CREATE_TIME;
-            }
+                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
+                long timestamp = msg.getPublishTime();
 
-            ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, 
partition, offset, timestamp,
-                    timestampType, -1, msg.hasKey() ? msg.getKey().length() : 
0, msg.getData().length, key, value);
+                if (msg.getEventTime() > 0) {
+                    // If we have Event time, use that in preference
+                    timestamp = msg.getEventTime();
+                    timestampType = TimestampType.CREATE_TIME;
+                }
 
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new 
HashMap<>();
-            records.put(tp, Lists.newArrayList(consumerRecord));
+                ConsumerRecord<K, V> consumerRecord = new 
ConsumerRecord<>(topic, partition, offset, timestamp,
+                        timestampType, -1, msg.hasKey() ? 
msg.getKey().length() : 0, msg.getData().length, key, value);
+
+                records.computeIfAbsent(tp, k -> new 
ArrayList<>()).add(consumerRecord);
+
+                // Update last offset seen by application
+                lastReceivedOffset.put(tp, offset);
+
+                // Check if we have an item already available
+                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
+            }
+
+            if (isAutoCommit) {
+                // Commit the offset of previously dequeued messages
+                commitAsync();
+            }
 
-            // Update last offset seen by application
-            lastReceivedOffset.put(tp, offset);
             return new ConsumerRecords<>(records);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
index 20c3888..a0218b3 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -82,13 +83,14 @@ public class KafkaConsumerTest extends BrokerTestBase {
             pulsarProducer.send(msg);
         }
 
-        for (int i = 0; i < 10; i++) {
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
+
+                received.incrementAndGet();
             });
 
             consumer.commitSync();
@@ -119,13 +121,14 @@ public class KafkaConsumerTest extends BrokerTestBase {
             pulsarProducer.send(msg);
         }
 
-        for (int i = 0; i < 10; i++) {
+
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
+                received.incrementAndGet();
             });
         }
 
@@ -162,18 +165,19 @@ public class KafkaConsumerTest extends BrokerTestBase {
             pulsarProducer.send(msg);
         }
 
-        for (int i = 0; i < 10; i++) {
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
 
                 Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>();
                 offsets.put(new TopicPartition(record.topic(), 
record.partition()),
                         new OffsetAndMetadata(record.offset()));
                 consumer.commitSync(offsets);
+
+                received.incrementAndGet();
             });
         }
 
@@ -225,9 +229,10 @@ public class KafkaConsumerTest extends BrokerTestBase {
 
         consumers.forEach(consumer -> {
             int expectedMessaged = N / consumers.size();
-            for (int i = 0; i < expectedMessaged; i++) {
+
+            for (int i = 0; i < expectedMessaged;) {
                 ConsumerRecords<String, String> records = consumer.poll(100);
-                assertEquals(records.count(), 1);
+                i += records.count();
             }
 
             // No more messages for this consumer
@@ -260,13 +265,14 @@ public class KafkaConsumerTest extends BrokerTestBase {
             pulsarProducer.send(msg);
         }
 
-        for (int i = 0; i < 10; i++) {
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
+
+                received.incrementAndGet();
             });
 
             consumer.commitSync();
@@ -277,13 +283,14 @@ public class KafkaConsumerTest extends BrokerTestBase {
         Thread.sleep(500);
 
         // Messages should be available again
-        for (int i = 0; i < 10; i++) {
+        received.set(0);
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
+
+                received.incrementAndGet();
             });
 
             consumer.commitSync();
@@ -314,13 +321,14 @@ public class KafkaConsumerTest extends BrokerTestBase {
             pulsarProducer.send(msg);
         }
 
-        for (int i = 0; i < 10; i++) {
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 1);
-            int idx = i;
             records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(idx));
-                assertEquals(record.value(), "hello-" + idx);
+                assertEquals(record.key(), Integer.toString(received.get()));
+                assertEquals(record.value(), "hello-" + received.get());
+
+                received.incrementAndGet();
             });
 
             consumer.commitSync();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to