This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 99fb872 In Kafka consumer wrapper, group multiple messages when the app polls (#1115) 99fb872 is described below commit 99fb872f2f27991de5a974a9b8434d3d6aa9366b Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jan 31 09:13:45 2018 -0800 In Kafka consumer wrapper, group multiple messages when the app polls (#1115) * In Kafka consumer wrapper, group multiple messages when the app polls * Fixed unit tests --- .../client/kafka/compat/tests/KafkaApiTest.java | 20 +++--- .../clients/consumer/PulsarKafkaConsumer.java | 68 +++++++++++--------- .../kafka/compat/tests/KafkaConsumerTest.java | 72 ++++++++++++---------- 3 files changed, 91 insertions(+), 69 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java index 44357ea..957f3aa 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaApiTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -91,17 +92,18 @@ public class KafkaApiTest extends BrokerTestBase { producer.flush(); producer.close(); - for (int i = 0; i < 10; i++) { - ConsumerRecords<Integer, String> records = consumer.poll(1000); - assertEquals(records.count(), 1); - - int idx = i; + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { + ConsumerRecords<Integer, String> records = consumer.poll(100); records.forEach(record -> { - log.info("Received record: {}", record); - assertEquals(record.key().intValue(), idx); - assertEquals(record.value(), "hello-" + idx); - assertEquals(record.offset(), offsets.get(idx).longValue()); + assertEquals(record.key().intValue(), received.get()); + assertEquals(record.value(), "hello-" + received.get()); + assertEquals(record.offset(), offsets.get(received.get()).longValue()); + + received.incrementAndGet(); }); + + consumer.commitSync(); } consumer.close(); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index 379bde4..d3dc6e4 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -61,8 +61,6 @@ import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.naming.DestinationName; -import com.google.common.collect.Lists; - public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener { private static final long serialVersionUID = 1L; @@ -146,6 +144,9 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene Properties properties = new Properties(); config.originals().forEach((k, v) -> properties.put(k, v)); ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); + // Since this client instance is going to be used just for the consumers, we can enable Nagle to group + // all the acknowledgments sent to broker within a short time frame + clientConf.setUseTcpNoDelay(false); try { client = PulsarClient.create(serviceUrl, clientConf); } catch (PulsarClientException e) { @@ -270,6 +271,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene }); } + private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000; + @SuppressWarnings("unchecked") @Override public ConsumerRecords<K, V> poll(long timeoutMillis) { @@ -279,40 +282,49 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY; } - if (isAutoCommit) { - // Commit the offset of previously dequeued messages - commitAsync(); - } + Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>(); - DestinationName dn = DestinationName.get(item.consumer.getTopic()); - String topic = dn.getPartitionedTopicName(); - int partition = dn.isPartitioned() ? dn.getPartitionIndex() : 0; - Message msg = item.message; - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - long offset = MessageIdUtils.getOffset(msgId); + int numberOfRecords = 0; - TopicPartition tp = new TopicPartition(topic, partition); + while (item != null && ++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) { + DestinationName dn = DestinationName.get(item.consumer.getTopic()); + String topic = dn.getPartitionedTopicName(); + int partition = dn.isPartitioned() ? dn.getPartitionIndex() : 0; + Message msg = item.message; + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + long offset = MessageIdUtils.getOffset(msgId); - K key = getKey(topic, msg); - V value = valueDeserializer.deserialize(topic, msg.getData()); + TopicPartition tp = new TopicPartition(topic, partition); - TimestampType timestampType = TimestampType.LOG_APPEND_TIME; - long timestamp = msg.getPublishTime(); + K key = getKey(topic, msg); + V value = valueDeserializer.deserialize(topic, msg.getData()); - if (msg.getEventTime() > 0) { - // If we have Event time, use that in preference - timestamp = msg.getEventTime(); - timestampType = TimestampType.CREATE_TIME; - } + TimestampType timestampType = TimestampType.LOG_APPEND_TIME; + long timestamp = msg.getPublishTime(); - ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp, - timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value); + if (msg.getEventTime() > 0) { + // If we have Event time, use that in preference + timestamp = msg.getEventTime(); + timestampType = TimestampType.CREATE_TIME; + } - Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>(); - records.put(tp, Lists.newArrayList(consumerRecord)); + ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp, + timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value); + + records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord); + + // Update last offset seen by application + lastReceivedOffset.put(tp, offset); + + // Check if we have an item already available + item = receivedMessages.poll(0, TimeUnit.MILLISECONDS); + } + + if (isAutoCommit) { + // Commit the offset of previously dequeued messages + commitAsync(); + } - // Update last offset seen by application - lastReceivedOffset.put(tp, offset); return new ConsumerRecords<>(records); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java index 20c3888..a0218b3 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -82,13 +83,14 @@ public class KafkaConsumerTest extends BrokerTestBase { pulsarProducer.send(msg); } - for (int i = 0; i < 10; i++) { + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); + + received.incrementAndGet(); }); consumer.commitSync(); @@ -119,13 +121,14 @@ public class KafkaConsumerTest extends BrokerTestBase { pulsarProducer.send(msg); } - for (int i = 0; i < 10; i++) { + + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); + received.incrementAndGet(); }); } @@ -162,18 +165,19 @@ public class KafkaConsumerTest extends BrokerTestBase { pulsarProducer.send(msg); } - for (int i = 0; i < 10; i++) { + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); consumer.commitSync(offsets); + + received.incrementAndGet(); }); } @@ -225,9 +229,10 @@ public class KafkaConsumerTest extends BrokerTestBase { consumers.forEach(consumer -> { int expectedMessaged = N / consumers.size(); - for (int i = 0; i < expectedMessaged; i++) { + + for (int i = 0; i < expectedMessaged;) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); + i += records.count(); } // No more messages for this consumer @@ -260,13 +265,14 @@ public class KafkaConsumerTest extends BrokerTestBase { pulsarProducer.send(msg); } - for (int i = 0; i < 10; i++) { + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); + + received.incrementAndGet(); }); consumer.commitSync(); @@ -277,13 +283,14 @@ public class KafkaConsumerTest extends BrokerTestBase { Thread.sleep(500); // Messages should be available again - for (int i = 0; i < 10; i++) { + received.set(0); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); + + received.incrementAndGet(); }); consumer.commitSync(); @@ -314,13 +321,14 @@ public class KafkaConsumerTest extends BrokerTestBase { pulsarProducer.send(msg); } - for (int i = 0; i < 10; i++) { + AtomicInteger received = new AtomicInteger(); + while (received.get() < 10) { ConsumerRecords<String, String> records = consumer.poll(100); - assertEquals(records.count(), 1); - int idx = i; records.forEach(record -> { - assertEquals(record.key(), Integer.toString(idx)); - assertEquals(record.value(), "hello-" + idx); + assertEquals(record.key(), Integer.toString(received.get())); + assertEquals(record.value(), "hello-" + received.get()); + + received.incrementAndGet(); }); consumer.commitSync(); -- To stop receiving notification emails like this one, please contact mme...@apache.org.