This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new c0ceeccf632b CAMEL-23571: Ensure GooglePubsubComponent does not leave
orphan channels (#23378) (#23392)
c0ceeccf632b is described below
commit c0ceeccf632b840cf8978756490e052185d61568
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu May 21 08:38:08 2026 +0200
CAMEL-23571: Ensure GooglePubsubComponent does not leave orphan channels
(#23378) (#23392)
Co-authored-by: David Riseley <[email protected]>
---
.../google/pubsub/GooglePubsubComponent.java | 32 +++++++-------
.../google/pubsub/OrphanChannelLogAppender.java | 49 ++++++++++++++++++++++
.../component/google/pubsub/PubsubTestSupport.java | 46 +++++++++++++++-----
3 files changed, 101 insertions(+), 26 deletions(-)
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
index 9a98f73413e2..d6049c822ccb 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
@@ -28,8 +28,7 @@ import java.util.stream.Stream;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
-import com.google.api.gax.grpc.GrpcTransportChannel;
-import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
@@ -45,7 +44,6 @@ import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
-import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.camel.Endpoint;
import org.apache.camel.RuntimeCamelException;
@@ -155,9 +153,10 @@ public class GooglePubsubComponent extends
HeaderFilterStrategyComponent {
throws IOException {
Publisher.Builder builder = Publisher.newBuilder(topicName);
if (StringHelper.trimToNull(endpoint) != null) {
- ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
- TransportChannelProvider channelProvider
- =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ TransportChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(endpoint)
+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .build();
builder.setChannelProvider(channelProvider);
}
builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint));
@@ -182,9 +181,10 @@ public class GooglePubsubComponent extends
HeaderFilterStrategyComponent {
throws IOException {
Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName,
messageReceiver);
if (StringHelper.trimToNull(endpoint) != null) {
- ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
- TransportChannelProvider channelProvider
- =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ TransportChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(endpoint)
+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .build();
builder.setChannelProvider(channelProvider);
}
builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint));
@@ -208,9 +208,10 @@ public class GooglePubsubComponent extends
HeaderFilterStrategyComponent {
}
if (StringHelper.trimToNull(endpoint) != null) {
- ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
- TransportChannelProvider channelProvider
- =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ TransportChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(endpoint)
+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .build();
builder.setTransportChannelProvider(channelProvider);
}
builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint));
@@ -221,9 +222,10 @@ public class GooglePubsubComponent extends
HeaderFilterStrategyComponent {
SubscriptionAdminSettings.Builder builder =
SubscriptionAdminSettings.newBuilder();
if (StringHelper.trimToNull(endpoint) != null) {
- ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
- TransportChannelProvider channelProvider
- =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ TransportChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(endpoint)
+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .build();
builder.setTransportChannelProvider(channelProvider);
}
builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint));
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java
new file mode 100644
index 000000000000..c8a600f2d248
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+class OrphanChannelLogAppender extends AbstractAppender {
+
+ private final List<String> messages = Collections.synchronizedList(new
ArrayList<>());
+
+ OrphanChannelLogAppender() {
+ super("OrphanChannelLogAppender", null,
PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY);
+ start();
+ }
+
+ @Override
+ public void append(LogEvent event) {
+ messages.add(event.getMessage().getFormattedMessage());
+ }
+
+ List<String> getMessages() {
+ return Collections.unmodifiableList(messages);
+ }
+
+ void reset() {
+ messages.clear();
+ }
+}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
index 01acb43891bb..5483699345cf 100644
---
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
@@ -18,13 +18,14 @@ package org.apache.camel.component.google.pubsub;
import java.io.IOException;
import java.io.InputStream;
+import java.util.List;
import java.util.Properties;
import java.util.logging.LogManager;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
-import com.google.api.gax.grpc.GrpcTransportChannel;
-import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
@@ -33,7 +34,6 @@ import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
-import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelContext;
@@ -42,6 +42,7 @@ import
org.apache.camel.test.infra.google.pubsub.services.GooglePubSubService;
import
org.apache.camel.test.infra.google.pubsub.services.GooglePubSubServiceFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.camel.test.junit5.TestSupport;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,8 @@ public class PubsubTestSupport extends CamelTestSupport {
public static final String PROJECT_ID;
+ private static final OrphanChannelLogAppender ORPHAN_APPENDER = new
OrphanChannelLogAppender();
+
static {
Properties testProperties = loadProperties();
PROJECT_ID = testProperties.getProperty("project.id");
@@ -65,6 +68,11 @@ public class PubsubTestSupport extends CamelTestSupport {
"Unable to setup JUL-to-slf4j logging bridge. The test
execution should result in a log of bogus output. Error: {}",
e.getMessage(), e);
}
+
+ org.apache.logging.log4j.core.Logger orphanLogger
+ = (org.apache.logging.log4j.core.Logger)
org.apache.logging.log4j.LogManager
+
.getLogger("io.grpc.internal.ManagedChannelOrphanWrapper");
+ orphanLogger.addAppender(ORPHAN_APPENDER);
}
private static Properties loadProperties() {
@@ -86,6 +94,23 @@ public class PubsubTestSupport extends CamelTestSupport {
return loadProperties();
}
+ @Override
+ protected void setupResources() throws Exception {
+ ORPHAN_APPENDER.reset();
+ super.setupResources();
+ }
+
+ @Override
+ protected void cleanupResources() throws Exception {
+ super.cleanupResources();
+ System.gc();
+ List<String> orphans = ORPHAN_APPENDER.getMessages();
+ if (!orphans.isEmpty()) {
+ Assertions.fail("gRPC channel(s) garbage collected without
shutdown (ManagedChannelOrphanWrapper):\n"
+ + String.join("\n", orphans));
+ }
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
createTopicSubscription();
@@ -136,17 +161,16 @@ public class PubsubTestSupport extends CamelTestSupport {
subscriptionAdminClient.shutdown();
}
- private FixedTransportChannelProvider createChannelProvider() {
- ManagedChannel channel = ManagedChannelBuilder
- .forTarget(service.getServiceAddress())
- .usePlaintext()
- .build();
+ private TransportChannelProvider createChannelProvider() {
- return
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ return InstantiatingGrpcChannelProvider.newBuilder()
+ .setEndpoint(service.getServiceAddress())
+ .setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
+ .build();
}
private TopicAdminClient createTopicAdminClient() {
- FixedTransportChannelProvider channelProvider =
createChannelProvider();
+ TransportChannelProvider channelProvider = createChannelProvider();
CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
try {
@@ -161,7 +185,7 @@ public class PubsubTestSupport extends CamelTestSupport {
}
private SubscriptionAdminClient createSubscriptionAdminClient() {
- FixedTransportChannelProvider channelProvider =
createChannelProvider();
+ TransportChannelProvider channelProvider = createChannelProvider();
CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
try {