This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c88a9ba0d5830a6fa9e2b7d0fb4c1576f94a4a9 Author: Yufan Sheng <[email protected]> AuthorDate: Mon Sep 5 23:51:10 2022 +0800 [FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsar runtime operator. --- .../testutils/runtime/PulsarRuntimeOperator.java | 155 ++++++++------------- 1 file changed, 58 insertions(+), 97 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index ada1bc58f41..467eb82d141 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -27,6 +27,7 @@ import org.apache.flink.shaded.guava30.com.google.common.base.Strings; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -51,7 +52,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Stream; @@ -69,7 +69,8 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtil import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest; +import static org.apache.pulsar.client.api.MessageId.earliest; +import static org.apache.pulsar.client.api.ProducerAccessMode.Shared; import static org.apache.pulsar.client.api.SubscriptionMode.Durable; import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; @@ -88,8 +89,6 @@ public class PulsarRuntimeOperator implements Closeable { private final String adminUrl; private final PulsarClient client; private final PulsarAdmin admin; - private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Producer<?>>> producers; - private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Consumer<?>>> consumers; public PulsarRuntimeOperator(String serviceUrl, String adminUrl) { this(serviceUrl, serviceUrl, adminUrl, adminUrl); @@ -110,8 +109,6 @@ public class PulsarRuntimeOperator implements Closeable { .enableTransaction(true) .build()); this.admin = sneakyClient(() -> PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()); - this.producers = new ConcurrentHashMap<>(); - this.consumers = new ConcurrentHashMap<>(); } /** @@ -162,7 +159,8 @@ public class PulsarRuntimeOperator implements Closeable { } /** - * Create a pulsar topic with given partition number. + * Create a pulsar topic with given partition number if the topic doesn't exist. We won't do + * anything for the existing topic. Make sure correctly used in the testing code. * * @param topic The name of the topic. * @param numberOfPartitions The number of partitions. We would create a non-partitioned topic @@ -212,10 +210,6 @@ public class PulsarRuntimeOperator implements Closeable { return; } - // Close all the available consumers and producers. - removeConsumers(topic); - removeProducers(topic); - if (metadata.partitions == NON_PARTITIONED) { sneakyAdmin(() -> admin().topics().delete(topicName)); } else { @@ -298,8 +292,8 @@ public class PulsarRuntimeOperator implements Closeable { */ public <T> List<MessageId> sendMessages( String topic, Schema<T> schema, String key, Collection<T> messages) { + Producer<T> producer = createProducer(topic, schema); try { - Producer<T> producer = createProducer(topic, schema); List<MessageId> messageIds = new ArrayList<>(messages.size()); for (T message : messages) { @@ -315,6 +309,15 @@ public class PulsarRuntimeOperator implements Closeable { } catch (PulsarClientException e) { sneakyThrow(e); return emptyList(); + } finally { + try { + // Waiting for all the pending messages be sent to the Pulsar. + producer.flush(); + // Directly close without the flush will drop all the pending messages. + producer.close(); + } catch (PulsarClientException e) { + // Just ignore the exception here. + } } } @@ -323,9 +326,10 @@ public class PulsarRuntimeOperator implements Closeable { * message from this topic. */ public <T> Message<T> receiveMessage(String topic, Schema<T> schema) { - try { - Consumer<T> consumer = createConsumer(topic, schema); - return drainOneMessage(consumer); + try (Consumer<T> consumer = createConsumer(topic, schema)) { + Message<T> message = consumer.receive(); + consumer.acknowledge(message.getMessageId()); + return message; } catch (PulsarClientException e) { sneakyThrow(e); return null; @@ -337,10 +341,10 @@ public class PulsarRuntimeOperator implements Closeable { * timeout. A null message would be returned if no message has been consumed from Pulsar. */ public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) { - try { - Consumer<T> consumer = createConsumer(topic, schema); - Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS); - consumer.acknowledgeCumulative(message.getMessageId()); + try (Consumer<T> consumer = createConsumer(topic, schema)) { + Message<T> message = + consumer.receive(Math.toIntExact(timeout.toMillis()), MILLISECONDS); + consumer.acknowledge(message.getMessageId()); return message; } catch (Exception e) { @@ -364,12 +368,12 @@ public class PulsarRuntimeOperator implements Closeable { return singletonList(message); } else { // Drain a fixed number of messages. - try { - Consumer<T> consumer = createConsumer(topic, schema); + try (Consumer<T> consumer = createConsumer(topic, schema)) { List<Message<T>> messages = new ArrayList<>(counts); for (int i = 0; i < counts; i++) { - Message<T> message = drainOneMessage(consumer); + Message<T> message = consumer.receive(); messages.add(message); + consumer.acknowledge(message.getMessageId()); } return messages; } catch (PulsarClientException e) { @@ -439,9 +443,6 @@ public class PulsarRuntimeOperator implements Closeable { /** This method is used for test framework. You can't close this operator manually. */ @Override public void close() throws IOException { - producers.clear(); - consumers.clear(); - if (admin != null) { admin.close(); } @@ -454,93 +455,53 @@ public class PulsarRuntimeOperator implements Closeable { private void createNonPartitionedTopic(String topic) { try { - admin().lookups().lookupTopic(topic); - sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0)); + admin().topics().createNonPartitionedTopic(topic); } catch (PulsarAdminException e) { - sneakyAdmin(() -> admin().topics().createNonPartitionedTopic(topic)); + if (!(e instanceof ConflictException + && e.getMessage().equals("This topic already exists"))) { + sneakyThrow(e); + } } } private void createPartitionedTopic(String topic, int numberOfPartitions) { try { - admin().lookups().lookupPartitionedTopic(topic); - sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0)); + admin().topics().createPartitionedTopic(topic, numberOfPartitions); } catch (PulsarAdminException e) { - sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, numberOfPartitions)); + if (!(e instanceof ConflictException + && e.getMessage().equals("This topic already exists"))) { + sneakyThrow(e); + } } } - @SuppressWarnings("unchecked") - private <T> Producer<T> createProducer(String topic, Schema<T> schema) - throws PulsarClientException { - TopicName topicName = TopicName.get(topic); - String name = topicName.getPartitionedTopicName(); - int index = topicName.getPartitionIndex(); - ConcurrentHashMap<Integer, Producer<?>> topicProducers = - producers.computeIfAbsent(name, d -> new ConcurrentHashMap<>()); - - return (Producer<T>) - topicProducers.computeIfAbsent( - index, - i -> { - ProducerBuilder<T> builder = - client().newProducer(schema) - .topic(topic) - .enableBatching(false) - .enableMultiSchema(true); - - return sneakyClient(builder::create); - }); - } + private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) { + ProducerBuilder<T> builder = + client().newProducer(schema) + .topic(topic) + .enableBatching(false) + .enableMultiSchema(true) + .accessMode(Shared); - @SuppressWarnings("unchecked") - private <T> Consumer<T> createConsumer(String topic, Schema<T> schema) - throws PulsarClientException { - TopicName topicName = TopicName.get(topic); - String name = topicName.getPartitionedTopicName(); - int index = topicName.getPartitionIndex(); - ConcurrentHashMap<Integer, Consumer<?>> topicConsumers = - consumers.computeIfAbsent(name, d -> new ConcurrentHashMap<>()); - - return (Consumer<T>) - topicConsumers.computeIfAbsent( - index, - i -> { - ConsumerBuilder<T> builder = - client().newConsumer(schema) - .topic(topic) - .subscriptionName(SUBSCRIPTION_NAME) - .subscriptionMode(Durable) - .subscriptionType(Exclusive) - .subscriptionInitialPosition(Earliest); - - return sneakyClient(builder::subscribe); - }); + return sneakyClient(builder::create); } - private void removeProducers(String topic) { - String topicName = topicName(topic); - ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName); - if (integerProducers != null) { - for (Producer<?> producer : integerProducers.values()) { - sneakyClient(producer::close); - } + private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) { + // Create the earliest subscription if it's not existed. + List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic)); + if (!subscriptions.contains(SUBSCRIPTION_NAME)) { + sneakyAdmin( + () -> admin().topics().createSubscription(topic, SUBSCRIPTION_NAME, earliest)); } - } - private void removeConsumers(String topic) { - String topicName = topicName(topic); - ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName); - if (integerConsumers != null) { - for (Consumer<?> consumer : integerConsumers.values()) { - sneakyClient(consumer::close); - } - } - } + // Create the consumer without the initial position. + ConsumerBuilder<T> builder = + client().newConsumer(schema) + .topic(topic) + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionMode(Durable) + .subscriptionType(Exclusive); - private <T> Message<T> drainOneMessage(Consumer<T> consumer) throws PulsarClientException { - Message<T> message = consumer.receive(); - consumer.acknowledgeCumulative(message.getMessageId()); - return message; + return sneakyClient(builder::subscribe); } }
