This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch backport/23378-to-camel-4.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit f7e692a83cd2de52045aae1e9285e13a23d5a785 Author: David Riseley <[email protected]> AuthorDate: Wed May 20 19:42:44 2026 +0100 CAMEL-23571: Ensure GooglePubsubComponent does not leave orphan channels (#23378) --- .../google/pubsub/GooglePubsubComponent.java | 32 +++++++------- .../google/pubsub/OrphanChannelLogAppender.java | 49 ++++++++++++++++++++++ .../component/google/pubsub/PubsubTestSupport.java | 46 +++++++++++++++----- 3 files changed, 101 insertions(+), 26 deletions(-) diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java index 9a98f73413e2..d6049c822ccb 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java @@ -28,8 +28,7 @@ import java.util.stream.Stream; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; @@ -45,7 +44,6 @@ import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; -import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.camel.Endpoint; import org.apache.camel.RuntimeCamelException; @@ -155,9 +153,10 @@ public class GooglePubsubComponent extends HeaderFilterStrategyComponent { throws IOException { Publisher.Builder builder = Publisher.newBuilder(topicName); if (StringHelper.trimToNull(endpoint) != null) { - ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); - TransportChannelProvider channelProvider - = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + TransportChannelProvider channelProvider = InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); builder.setChannelProvider(channelProvider); } builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint)); @@ -182,9 +181,10 @@ public class GooglePubsubComponent extends HeaderFilterStrategyComponent { throws IOException { Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, messageReceiver); if (StringHelper.trimToNull(endpoint) != null) { - ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); - TransportChannelProvider channelProvider - = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + TransportChannelProvider channelProvider = InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); builder.setChannelProvider(channelProvider); } builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint)); @@ -208,9 +208,10 @@ public class GooglePubsubComponent extends HeaderFilterStrategyComponent { } if (StringHelper.trimToNull(endpoint) != null) { - ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); - TransportChannelProvider channelProvider - = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + TransportChannelProvider channelProvider = InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); builder.setTransportChannelProvider(channelProvider); } builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint)); @@ -221,9 +222,10 @@ public class GooglePubsubComponent extends HeaderFilterStrategyComponent { SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder(); if (StringHelper.trimToNull(endpoint) != null) { - ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); - TransportChannelProvider channelProvider - = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + TransportChannelProvider channelProvider = InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); builder.setTransportChannelProvider(channelProvider); } builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint)); diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java new file mode 100644 index 000000000000..c8a600f2d248 --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/OrphanChannelLogAppender.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.layout.PatternLayout; + +class OrphanChannelLogAppender extends AbstractAppender { + + private final List<String> messages = Collections.synchronizedList(new ArrayList<>()); + + OrphanChannelLogAppender() { + super("OrphanChannelLogAppender", null, PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY); + start(); + } + + @Override + public void append(LogEvent event) { + messages.add(event.getMessage().getFormattedMessage()); + } + + List<String> getMessages() { + return Collections.unmodifiableList(messages); + } + + void reset() { + messages.clear(); + } +} diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java index 01acb43891bb..5483699345cf 100644 --- a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java @@ -18,13 +18,14 @@ package org.apache.camel.component.google.pubsub; import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Properties; import java.util.logging.LogManager; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; @@ -33,7 +34,6 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; -import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.camel.BindToRegistry; import org.apache.camel.CamelContext; @@ -42,6 +42,7 @@ import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubService; import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubServiceFactory; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.test.junit5.TestSupport; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,8 @@ public class PubsubTestSupport extends CamelTestSupport { public static final String PROJECT_ID; + private static final OrphanChannelLogAppender ORPHAN_APPENDER = new OrphanChannelLogAppender(); + static { Properties testProperties = loadProperties(); PROJECT_ID = testProperties.getProperty("project.id"); @@ -65,6 +68,11 @@ public class PubsubTestSupport extends CamelTestSupport { "Unable to setup JUL-to-slf4j logging bridge. The test execution should result in a log of bogus output. Error: {}", e.getMessage(), e); } + + org.apache.logging.log4j.core.Logger orphanLogger + = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager + .getLogger("io.grpc.internal.ManagedChannelOrphanWrapper"); + orphanLogger.addAppender(ORPHAN_APPENDER); } private static Properties loadProperties() { @@ -86,6 +94,23 @@ public class PubsubTestSupport extends CamelTestSupport { return loadProperties(); } + @Override + protected void setupResources() throws Exception { + ORPHAN_APPENDER.reset(); + super.setupResources(); + } + + @Override + protected void cleanupResources() throws Exception { + super.cleanupResources(); + System.gc(); + List<String> orphans = ORPHAN_APPENDER.getMessages(); + if (!orphans.isEmpty()) { + Assertions.fail("gRPC channel(s) garbage collected without shutdown (ManagedChannelOrphanWrapper):\n" + + String.join("\n", orphans)); + } + } + @Override protected CamelContext createCamelContext() throws Exception { createTopicSubscription(); @@ -136,17 +161,16 @@ public class PubsubTestSupport extends CamelTestSupport { subscriptionAdminClient.shutdown(); } - private FixedTransportChannelProvider createChannelProvider() { - ManagedChannel channel = ManagedChannelBuilder - .forTarget(service.getServiceAddress()) - .usePlaintext() - .build(); + private TransportChannelProvider createChannelProvider() { - return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + return InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(service.getServiceAddress()) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); } private TopicAdminClient createTopicAdminClient() { - FixedTransportChannelProvider channelProvider = createChannelProvider(); + TransportChannelProvider channelProvider = createChannelProvider(); CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); try { @@ -161,7 +185,7 @@ public class PubsubTestSupport extends CamelTestSupport { } private SubscriptionAdminClient createSubscriptionAdminClient() { - FixedTransportChannelProvider channelProvider = createChannelProvider(); + TransportChannelProvider channelProvider = createChannelProvider(); CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); try {
