This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 339bbb9fdb2 camel-google-pubsub: Fix test
339bbb9fdb2 is described below
commit 339bbb9fdb2a04039f3b6336a21f326139c8fd85
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 12 10:55:19 2025 +0200
camel-google-pubsub: Fix test
---
.../pubsub/integration/AcknowledgementAsyncIT.java | 119 +++++++++++++++++++++
...ntIT.java => ManualAcknowledgementAsyncIT.java} | 40 ++++---
.../integration/ManualAcknowledgementIT.java | 25 ++---
3 files changed, 149 insertions(+), 35 deletions(-)
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
new file mode 100644
index 00000000000..9a29452c685
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AcknowledgementAsyncIT extends PubsubTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(AcknowledgementAsyncIT.class);
+
+ private static final String TOPIC_NAME = "failureSingleAsync";
+ private static final String SUBSCRIPTION_NAME = "failureSubAsync";
+ private static Boolean fail = false;
+
+ @EndpointInject("direct:in")
+ private Endpoint directIn;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+ private Endpoint pubsubTopic;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=false")
+ private Endpoint pubsubSubscription;
+
+ @EndpointInject("mock:receiveResult")
+ private MockEndpoint receiveResult;
+
+ @Produce("direct:in")
+ private ProducerTemplate producer;
+
+ @Override
+ public void createTopicSubscription() {
+ createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(directIn).routeId("Send_to_Fail").to(pubsubTopic);
+
+
from(pubsubSubscription).routeId("Fail_Receive").autoStartup(true).process(new
Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (AcknowledgementAsyncIT.fail) {
+ throw new Exception("fail");
+ }
+ }
+ }).to(receiveResult);
+ }
+ };
+ }
+
+ /**
+ * Testing acknowledgements. Three checks to be performed. Check 1 :
Successful round trip. Message received and
+ * acknowledged. If the ACK fails for the first message, it will be
delivered again for the second check and the
+ * body comparison will fail. Check 2 : Failure. As the route throws and
exception and the message is NACK'ed. The
+ * message should remain in the PubSub Subscription for the third check.
Check 3 : Success for the second message.
+ * The message received should match the second message sent.
+ */
+ @Test
+ public void singleMessage() throws Exception {
+
+ Exchange firstExchange = new DefaultExchange(context);
+ Exchange secondExchange = new DefaultExchange(context);
+
+ firstExchange.getIn().setBody("SUCCESS : " +
firstExchange.getExchangeId());
+ secondExchange.getIn().setBody("fail : " +
secondExchange.getExchangeId());
+
+ // Check 1 : Successful roundtrip.
+ LOG.debug("Acknowledgement Test : Stage 1");
+ receiveResult.reset();
+ fail = false;
+ receiveResult.expectedMessageCount(1);
+
receiveResult.expectedBodiesReceivedInAnyOrder(firstExchange.getIn().getBody());
+ producer.send(firstExchange);
+ receiveResult.assertIsSatisfied(3000);
+
+ // Check 2 : Failure for the second message.
+ LOG.debug("Acknowledgement Test : Stage 2");
+ receiveResult.reset();
+ fail = true;
+ receiveResult.expectedMessageCount(0);
+ producer.send(secondExchange);
+ receiveResult.assertIsSatisfied(3000);
+
+ // Check 3 : Success for the second message.
+ LOG.debug("Acknowledgement Test : Stage 3");
+ receiveResult.reset();
+ fail = false;
+ receiveResult.expectedMessageCount(1);
+
receiveResult.expectedBodiesReceivedInAnyOrder(secondExchange.getIn().getBody());
+ receiveResult.assertIsSatisfied(3000);
+ }
+}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
similarity index 74%
copy from
components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
copy to
components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
index 4ba5e8a467d..4287d3c3e14 100644
---
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
@@ -28,16 +28,16 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ManualAcknowledgementIT extends PubsubTestSupport {
- private static final Logger LOG =
LoggerFactory.getLogger(ManualAcknowledgementIT.class);
+public class ManualAcknowledgementAsyncIT extends PubsubTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(ManualAcknowledgementAsyncIT.class);
- private static final String TOPIC_NAME = "manualAcknowledgeTopic";
- private static final String SUBSCRIPTION_NAME =
"manualAcknowledgeSubscription";
- private static final String SYNC_ROUTE_ID =
"receive-from-subscription-sync";
+ private static final String TOPIC_NAME = "manualAcknowledgeAsyncTopic";
+ private static final String SUBSCRIPTION_NAME =
"manualAcknowledgeAsyncSubscription";
+ private static final String ROUTE_ID = "receive-from-subscription";
private static Boolean ack = true;
- @EndpointInject("mock:receiveResultSync")
- private MockEndpoint receiveResultSync;
+ @EndpointInject("mock:receiveResult")
+ private MockEndpoint receiveResult;
@Produce("direct:in")
private ProducerTemplate producer;
@@ -56,16 +56,16 @@ public class ManualAcknowledgementIT extends
PubsubTestSupport {
.routeId("send-to-topic")
.to("google-pubsub:{{project.id}}:" + TOPIC_NAME);
- from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true&ackMode=NONE")
+ from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=false&ackMode=NONE")
.autoStartup(false)
- .routeId(SYNC_ROUTE_ID)
- .to("mock:receiveResultSync")
+ .routeId(ROUTE_ID)
+ .to("mock:receiveResult")
.process(exchange -> {
GooglePubsubAcknowledge acknowledge
=
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
GooglePubsubAcknowledge.class);
- if (ManualAcknowledgementIT.ack) {
+ if (ManualAcknowledgementAsyncIT.ack) {
acknowledge.ack(exchange);
} else {
LOG.debug("Nack!");
@@ -81,20 +81,18 @@ public class ManualAcknowledgementIT extends
PubsubTestSupport {
// 2. Synchronous consumer with manual acknowledgement.
// Message should only be received once.
producer.sendBody("Testing!");
- receiveResultSync.expectedMessageCount(1);
- context.getRouteController().startRoute(SYNC_ROUTE_ID);
- receiveResultSync.assertIsSatisfied(3000);
- context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+ receiveResult.expectedMessageCount(1);
+ context.getRouteController().startRoute(ROUTE_ID);
+ receiveResult.assertIsSatisfied(3000);
+
+ receiveResult.reset();
- receiveResultSync.reset();
ack = false;
// 4. Synchronous consumer with manual negative-acknowledgement.
// Message should be continuously redelivered after being nacked.
- producer.sendBody("Testing!");
- receiveResultSync.expectedMinimumMessageCount(3);
- context.getRouteController().startRoute(SYNC_ROUTE_ID);
- receiveResultSync.assertIsSatisfied(3000);
- context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+ producer.sendBody("Testing2!");
+ receiveResult.expectedMinimumMessageCount(3);
+ receiveResult.assertIsSatisfied(3000);
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
index 4ba5e8a467d..a40fad3ae1c 100644
---
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
@@ -33,11 +33,11 @@ public class ManualAcknowledgementIT extends
PubsubTestSupport {
private static final String TOPIC_NAME = "manualAcknowledgeTopic";
private static final String SUBSCRIPTION_NAME =
"manualAcknowledgeSubscription";
- private static final String SYNC_ROUTE_ID =
"receive-from-subscription-sync";
+ private static final String ROUTE_ID = "receive-from-subscription";
private static Boolean ack = true;
- @EndpointInject("mock:receiveResultSync")
- private MockEndpoint receiveResultSync;
+ @EndpointInject("mock:receiveResult")
+ private MockEndpoint receiveResult;
@Produce("direct:in")
private ProducerTemplate producer;
@@ -58,8 +58,8 @@ public class ManualAcknowledgementIT extends
PubsubTestSupport {
from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true&ackMode=NONE")
.autoStartup(false)
- .routeId(SYNC_ROUTE_ID)
- .to("mock:receiveResultSync")
+ .routeId(ROUTE_ID)
+ .to("mock:receiveResult")
.process(exchange -> {
GooglePubsubAcknowledge acknowledge
=
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
@@ -81,20 +81,17 @@ public class ManualAcknowledgementIT extends
PubsubTestSupport {
// 2. Synchronous consumer with manual acknowledgement.
// Message should only be received once.
producer.sendBody("Testing!");
- receiveResultSync.expectedMessageCount(1);
- context.getRouteController().startRoute(SYNC_ROUTE_ID);
- receiveResultSync.assertIsSatisfied(3000);
- context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+ receiveResult.expectedMessageCount(1);
+ context.getRouteController().startRoute(ROUTE_ID);
+ receiveResult.assertIsSatisfied(3000);
- receiveResultSync.reset();
+ receiveResult.reset();
ack = false;
// 4. Synchronous consumer with manual negative-acknowledgement.
// Message should be continuously redelivered after being nacked.
producer.sendBody("Testing!");
- receiveResultSync.expectedMinimumMessageCount(3);
- context.getRouteController().startRoute(SYNC_ROUTE_ID);
- receiveResultSync.assertIsSatisfied(3000);
- context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+ receiveResult.expectedMinimumMessageCount(3);
+ receiveResult.assertIsSatisfied(3000);
}
}