Repository: camel
Updated Branches:
  refs/heads/master 4d567c485 -> f42c97e17


CAMEL-10944: camel-kafka - When consumer stop it should auto commit


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f42c97e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f42c97e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f42c97e1

Branch: refs/heads/master
Commit: f42c97e17f296ce3bfa7814854ac5599a0571752
Parents: 4d567c4
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Mar 5 18:48:02 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Mar 5 18:49:38 2017 +0100

----------------------------------------------------------------------
 .../camel-kafka/src/main/docs/kafka-component.adoc    |  3 ++-
 .../camel/component/kafka/KafkaConfiguration.java     | 14 ++++++++++++++
 .../apache/camel/component/kafka/KafkaConsumer.java   | 11 +++++++++++
 3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c7e1315..f9f5aa5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -66,7 +66,7 @@ The Kafka component is configured using the URI syntax with 
the following path a
 |=======================================================================
 {% endraw %}
 
-#### 81 query parameters:
+#### 82 query parameters:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -78,6 +78,7 @@ The Kafka component is configured using the URI syntax with 
the following path a
 | partitioner | common | 
org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The 
partitioner class for partitioning messages amongst sub-topics. The default 
partitioner is based on the hash of the key.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit 
to ZooKeeper the offset of messages already fetched by the consumer. This 
committed offset will be used when the process fails as the position from which 
the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that 
the consumer offsets are committed to zookeeper.
+| autoCommitOnStop | consumer | sync | String | Whether to perform an explicit 
auto commit when the consumer stops to ensure the broker has a commit from the 
last consumed message. This requires the option autoCommitEnable is turned on.
 | autoOffsetReset | consumer | latest | String | What to do when there is no 
initial offset in ZooKeeper or if an offset is out of range: smallest : 
automatically reset the offset to the smallest offset largest : automatically 
reset the offset to the largest offset fail: throw exception to the consumer
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored.
 | checkCrcs | consumer | true | Boolean | Automatically check the CRC32 of the 
records consumed. This ensures no on-the-wire or on-disk corruption to the 
messages occurred. This check adds some overhead so it may be disabled in cases 
seeking extreme performance.

http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 436287b..e96614f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -114,6 +114,8 @@ public class KafkaConfiguration {
     private String consumerId;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
+    @UriParam(label = "consumer", defaultValue = "sync", enums = 
"sync,async,none")
+    private String autoCommitOnStop = "sync";
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
@@ -612,6 +614,18 @@ public class KafkaConfiguration {
         this.autoOffsetReset = autoOffsetReset;
     }
 
+    public String getAutoCommitOnStop() {
+        return autoCommitOnStop;
+    }
+
+    /**
+     * Whether to perform an explicit auto commit when the consumer stops to 
ensure the broker
+     * has a commit from the last consumed message. This requires the option 
autoCommitEnable is turned on.
+     */
+    public void setAutoCommitOnStop(String autoCommitOnStop) {
+        this.autoCommitOnStop = autoCommitOnStop;
+    }
+
     public String getBrokers() {
         return brokers;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 711da6b..321aebb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -217,6 +217,17 @@ public class KafkaConsumer extends DefaultConsumer {
                         }
                     }
                 }
+
+                if (endpoint.getConfiguration().isAutoCommitEnable() != null 
&& endpoint.getConfiguration().isAutoCommitEnable()) {
+                    if 
("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                        LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
+                        consumer.commitAsync();
+                    } else if 
("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                        LOG.info("Auto commitSync on stop {} from topic {}", 
threadId, topicName);
+                        consumer.commitSync();
+                    }
+                }
+
                 LOG.info("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
             } catch (InterruptException e) {

Reply via email to