This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 462453bd7eb297c4117e98de65f55b90c63428c8 Author: Yijie Shen <[email protected]> AuthorDate: Sun Mar 1 17:28:56 2020 +0800 [Flink-Connector]Get PulsarClient from cache should always return an open instance (#6436) (cherry picked from commit 2ed2eb86e50d4515bee570c339b2719614a86ecc) --- .../pulsar/client/impl/PulsarClientImpl.java | 6 +++++- .../connectors/pulsar/CachedPulsarClient.java | 8 ++++++- .../connectors/pulsar/CachedPulsarClientTest.java | 25 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f51fb6b..db66c90 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -94,7 +94,7 @@ public class PulsarClientImpl implements PulsarClient { private final Timer timer; private final ExecutorProvider externalExecutorProvider; - enum State { + public enum State { Open, Closing, Closed } @@ -167,6 +167,10 @@ public class PulsarClientImpl implements PulsarClient { return clientClock; } + public AtomicReference<State> getState() { + return state; + } + @Override public ProducerBuilder<byte[]> newProducer() { return new ProducerBuilderImpl<>(this, Schema.BYTES); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java index 613d4cc..facfbef 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java @@ -79,7 +79,13 @@ public class CachedPulsarClient { } public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException { - return guavaCache.get(config); + PulsarClientImpl instance = guavaCache.get(config); + if (instance.getState().get() == PulsarClientImpl.State.Open) { + return instance; + } else { + guavaCache.invalidate(config); + return guavaCache.get(config); + } } private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) { diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java index a41609f..39cdca1 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java @@ -100,4 +100,29 @@ public class CachedPulsarClientTest { assertEquals(map2.values().iterator().next(), client1); } + + @Test + public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception { + PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class); + + ClientConfigurationData conf1 = new ClientConfigurationData(); + conf1.setServiceUrl(SERVICE_URL); + + PowerMockito.whenNew(PulsarClientImpl.class) + .withArguments(conf1).thenReturn(impl1); + + PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1); + + ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap(); + assertEquals(map1.size(), 1); + + client1.getState().set(PulsarClientImpl.State.Closed); + + PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1); + + assertNotEquals(client1, client2); + + ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap(); + assertEquals(map2.size(), 1); + } }
