orpiske commented on a change in pull request #6292:
URL: https://github.com/apache/camel/pull/6292#discussion_r730930389



##########
File path: 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
##########
@@ -70,6 +83,26 @@ protected void commitOffset(StateRepository<String, String> 
offsetRepository, To
         }
     }
 
+    protected void commitAsyncOffset(
+            StateRepository<String, String> offsetRepository, TopicPartition 
partition, long recordOffset) {
+        if (recordOffset != -1) {

Review comment:
       You could probably use `KafkaRecordProcessor.START_OFFSET` here instead 
of -1.
   
   Obs.: I know we are using -1 on other parts of this code, but it's on my 
cleanup TODO to fix it.

##########
File path: 
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    public static final String TOPIC = "testManualCommitTest";
+
+    @EndpointInject("kafka:" + TOPIC
+                    + 
"?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "&allowManualCommit=true&autoOffsetReset=earliest")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    @EndpointInject("mock:resultBar")
+    private MockEndpoint toBar;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from(from).routeId("foo").to("direct:aggregate");
+                // With sync manual commit, this would throw a concurrent 
modification exception
+                // It can be usesd in aggregator with completion 
timeout/interval for instance
+                // WARN: records from one partition must be processed by one 
unique thread
+                from("direct:aggregate").routeId("aggregate").to(to)
+                        .aggregate()
+                        .constant(true)
+                        .completionTimeout(1)
+                        
.aggregationStrategy(AggregationStrategies.groupedExchange())
+                        .split().body()
+                        .process(e -> {
+                            KafkaManualCommit manual = 
e.getMessage().getBody(Exchange.class)
+                                    
.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                            assertNotNull(manual);
+                            if 
(e.getMessage().getBody(String.class).equals("message-0")) {
+                                // Delay the commit of the first message
+                                Thread.sleep(2000);

Review comment:
       I think it would be better to avoid the `Thread.sleep` call. My .2 cents 
is to try to use one of the Java concurrent facilities or awaitility if 
necessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to