This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6a2aacc1479a8043280c405b5e6487ac69be5614
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue May 31 10:48:33 2022 +0200

    (chores) camel-kafka: fix flaky manual async commit test
---
 .../KafkaConsumerAsyncManualCommitIT.java          | 42 ++++++++++++++++------
 1 file changed, 32 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index f1e007482d5..bfda5a5a5e8 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -16,8 +16,9 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.Endpoint;
@@ -31,15 +32,23 @@ import 
org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
     public static final String TOPIC = "testManualCommitTest";
 
@@ -74,8 +83,6 @@ public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSuppo
         if (producer != null) {
             producer.close();
         }
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
@@ -111,12 +118,13 @@ public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSuppo
         };
     }
 
-    @RepeatedTest(4)
-    public void kafkaManualCommit() throws Exception {
+    @DisplayName("Tests that LAST_RECORD_BEFORE_COMMIT header includes a 
value")
+    @Order(1)
+    @Test
+    void testLastRecordBeforeCommitHeader() {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
-        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we 
use
-        // manual commit
+
         
to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
 
         for (int k = 0; k < 5; k++) {
@@ -125,8 +133,16 @@ public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSuppo
             producer.send(data);
         }
 
-        to.assertIsSatisfied(3000);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
to.assertIsSatisfied());
+
+        List<Exchange> exchangeList = to.getExchanges();
+        assertEquals(5, exchangeList.size());
+        assertEquals(true, 
exchangeList.get(4).getMessage().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 Boolean.class));
+    }
 
+    @Order(2)
+    @Test
+    void kafkaManualCommit() throws Exception {
         to.reset();
 
         // Second step: We shut down our route, we expect nothing will be 
recovered by our route
@@ -141,16 +157,22 @@ public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSuppo
         }
 
         to.assertIsSatisfied(3000);
+    }
 
+    @Order(3)
+    @Test
+    void testResumeFromTheRightPoint() throws Exception {
         to.reset();
 
         // Fourth step: We start again our route, since we have been 
committing the offsets from the first step,
         // we will expect to consume from the latest committed offset e.g from 
offset 5
         context.getRouteController().startRoute("foo");
+
         to.expectedMessageCount(3);
         to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", 
"message-7");
 
-        to.assertIsSatisfied(3000);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> to.assertIsSatisfied());
 
         assertEquals(0, failCount, "There should have been 0 commit failures");
     }

Reply via email to