Repository: camel
Updated Branches:
  refs/heads/master 3b508f55d -> 3485b6152


CAMEL-10115: introduced pollTimeoutMs with default 5000


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3485b615
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3485b615
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3485b615

Branch: refs/heads/master
Commit: 3485b61520d0f3d2823f2fb689ad75600f4d3cfd
Parents: 3b508f5
Author: Arno Noordover <anoordo...@users.noreply.github.com>
Authored: Sun Jul 3 20:30:03 2016 +0200
Committer: Arno Noordover <anoordo...@users.noreply.github.com>
Committed: Mon Jul 4 14:24:17 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc |  7 ++-
 .../component/kafka/KafkaConfiguration.java     | 15 +++++-
 .../camel/component/kafka/KafkaConsumer.java    |  4 +-
 .../component/kafka/KafkaConsumerTest.java      | 12 +++--
 .../clients/consumer/KafkaConsumerTest.java     | 52 ++++++++++++++++++++
 5 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3485b615/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc 
b/components/camel-kafka/src/main/docs/kafka.adoc
index 557eec2..575c6a2 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -87,8 +87,10 @@ The Kafka component supports 1 options which are listed 
below.
 
 
 
+
+
 // endpoint options: START
-The Kafka component supports 73 endpoint options which are listed below:
+The Kafka component supports 74 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -115,6 +117,7 @@ The Kafka component supports 73 endpoint options which are 
listed below:
 | keyDeserializer | consumer | 
org.apache.kafka.common.serialization.StringDeserializer | String | 
Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount 
of data per-partition the server will return. The maximum total memory used for 
a request will be partitions max.partition.fetch.bytes. This size must be at 
least as large as the maximum message size the server allows or else it is 
possible for the producer to send messages larger than the consumer can fetch. 
If that happens the consumer can get stuck trying to fetch a large message on a 
certain partition.
 | partitionAssignor | consumer | 
org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of 
the partition assignment strategy that the client will use to distribute 
partition ownership amongst consumer instances when group management is used
+| pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the 
KafkaConsumer.
 | seekToBeginning | consumer | false | boolean | If the option is true then 
KafkaConsumer will read from beginning on startup.
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect 
failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | 
org.apache.kafka.common.serialization.StringDeserializer | String | 
Deserializer class for value that implements the Deserializer interface.
@@ -180,6 +183,8 @@ The Kafka component supports 73 endpoint options which are 
listed below:
 
 
 
+
+
 For more information about Producer/Consumer configuration:
 
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/3485b615/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 35ebd36..c69f32f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -71,7 +71,9 @@ public class KafkaConfiguration {
     //session.timeout.ms
     @UriParam(label = "consumer", defaultValue = "30000")
     private Integer sessionTimeoutMs = 30000;
-    //auto.offset.reset
+    @UriParam(label = "consumer", defaultValue = "5000")
+    private Long pollTimeoutMs = 5000L;
+    //auto.offset.reset1
     @UriParam(label = "consumer", defaultValue = "latest", enums = 
"latest,earliest,none")
     private String autoOffsetReset = "latest";
     //partition.assignment.strategy
@@ -1074,6 +1076,17 @@ public class KafkaConfiguration {
         this.sessionTimeoutMs = sessionTimeoutMs;
     }
 
+    public Long getPollTimeoutMs() {
+        return pollTimeoutMs;
+    }
+
+    /**
+     * The timeout used when polling the KafkaConsumer.
+     */
+    public void setPollTimeoutMs(Long pollTimeoutMs) {
+        this.pollTimeoutMs = pollTimeoutMs;
+    }
+
     public String getPartitionAssignor() {
         return partitionAssignor;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3485b615/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 4780dfe..65ec6e3 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -41,11 +41,13 @@ public class KafkaConsumer extends DefaultConsumer {
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
+    private final Long pollTimeoutMs;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
+        this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
 
         if (endpoint.getBrokers() == null) {
             throw new IllegalArgumentException("BootStrap servers must be 
specified");
@@ -125,7 +127,7 @@ public class KafkaConsumer extends DefaultConsumer {
                     consumer.seekToBeginning(consumer.assignment());
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
-                    ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Long.MAX_VALUE);
+                    ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollTimeoutMs);
                     for (TopicPartition partition : allRecords.partitions()) {
                         List<ConsumerRecord<Object, Object>> partitionRecords 
= allRecords
                             .records(partition);

http://git-wip-us.apache.org/repos/asf/camel/blob/3485b615/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 86bc163..eaf880f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -21,6 +21,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class KafkaConsumerTest {
 
@@ -29,20 +30,23 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresBootstrapServers() throws Exception {
-        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
-        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234");
+        when(endpoint.getBrokers()).thenReturn("localhost:1234");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test
     public void consumerOnlyRequiresBootstrapServersAndGroupId() throws 
Exception {
-        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
-        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181");
+        when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getBrokers()).thenReturn("localhost:2181");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3485b615/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
new file mode 100644
index 0000000..809a267
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.hamcrest.core.IsNot;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConsumerTest {
+
+    @Mock
+    private KafkaConsumer<Object, Object> kafkaConsumer;
+
+    @Before
+    public void init() {
+        when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty());
+    }
+    @Test
+    public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = 
kafkaConsumer.poll(1000);
+        assertThat(consumerRecords, IsNot.not(nullValue()));
+    }
+
+    @Test
+    public void testPollGivenReturnsEmptyPartitionsShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = 
kafkaConsumer.poll(1000);
+        assertThat(consumerRecords.partitions(), IsNot.not(nullValue()));
+    }
+}

Reply via email to