This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 136add5d0c9c5b9b2869a9ee194f78449065b18e Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Tue Feb 15 22:22:19 2022 +0800 [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink. --- .../common/utils/PulsarTransactionUtils.java | 68 ++++ .../flink/connector/pulsar/sink/PulsarSink.java | 136 ++++++++ .../connector/pulsar/sink/PulsarSinkBuilder.java | 354 +++++++++++++++++++++ .../connector/pulsar/sink/PulsarSinkOptions.java | 14 +- .../pulsar/sink/committer/PulsarCommittable.java | 71 +++++ .../committer/PulsarCommittableSerializer.java | 65 ++++ .../pulsar/sink/committer/PulsarCommitter.java | 174 ++++++++++ .../pulsar/sink/config/SinkConfiguration.java | 17 +- .../connector/pulsar/sink/writer/PulsarWriter.java | 264 +++++++++++++++ .../sink/writer/context/PulsarSinkContext.java | 46 +++ .../sink/writer/context/PulsarSinkContextImpl.java | 61 ++++ .../sink/writer/router/KeyHashTopicRouter.java | 71 +++++ .../pulsar/sink/writer/router/MessageKeyHash.java | 85 +++++ .../sink/writer/router/RoundRobinTopicRouter.java | 63 ++++ .../pulsar/sink/writer/router/TopicRouter.java | 64 ++++ .../sink/writer/router/TopicRoutingMode.java | 87 +++++ .../sink/writer/topic/TopicMetadataListener.java | 173 ++++++++++ .../sink/writer/topic/TopicProducerRegister.java | 202 ++++++++++++ 18 files changed, 2011 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java new file mode 100644 index 0000000..a48b4d4 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; +import static org.apache.flink.util.ExceptionUtils.findThrowable; + +/** A suit of workarounds for the Pulsar Transaction. */ +@Internal +public final class PulsarTransactionUtils { + + private PulsarTransactionUtils() { + // No public constructor + } + + /** Create transaction with given timeout millis. */ + public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { + try { + CompletableFuture<Transaction> future = + sneakyClient(pulsarClient::newTransaction) + .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .build(); + + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new FlinkRuntimeException(e); + } + } + + /** + * This is a bug in original {@link TransactionCoordinatorClientException#unwrap(Throwable)} + * method. Pulsar wraps the {@link ExecutionException} which hides the real execution exception. + */ + public static TransactionCoordinatorClientException unwrap( + TransactionCoordinatorClientException e) { + return findThrowable(e.getCause(), TransactionCoordinatorClientException.class).orElse(e); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java new file mode 100644 index 0000000..811d5b5 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; +import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Sink implementation of Pulsar. Please use a {@link PulsarSinkBuilder} to construct a {@link + * PulsarSink}. The following example shows how to create a PulsarSink receiving records of {@code + * String} type. + * + * <pre>{@code + * PulsarSink<String> sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopic(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .build(); + * }</pre> + * + * <p>The sink supports all delivery guarantees described by {@link DeliveryGuarantee}. + * + * <ul> + * <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: messages may be lost in + * case of issues on the Pulsar broker and messages may be duplicated in case of a Flink + * failure. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in + * the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages + * will be lost in case of any issue with the Pulsar brokers but messages may be duplicated + * when Flink restarts. + * <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: In this mode the PulsarSink will write all messages + * in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no + * duplicates will be seen in case of a Flink restart. However, this delays record writing + * effectively until a checkpoint is written, so adjust the checkpoint duration accordingly. + * Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >> + * maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar + * expires an uncommitted transaction. + * </ul> + * + * <p>See {@link PulsarSinkBuilder} for more details. + * + * @param <IN> The input type of the sink. + */ +@PublicEvolving +public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommittable> { + private static final long serialVersionUID = 4416714587951282119L; + + private final SinkConfiguration sinkConfiguration; + private final PulsarSerializationSchema<IN> serializationSchema; + private final TopicMetadataListener metadataListener; + private final TopicRouter<IN> topicRouter; + + PulsarSink( + SinkConfiguration sinkConfiguration, + PulsarSerializationSchema<IN> serializationSchema, + TopicMetadataListener metadataListener, + TopicRoutingMode topicRoutingMode, + TopicRouter<IN> topicRouter) { + this.sinkConfiguration = checkNotNull(sinkConfiguration); + this.serializationSchema = checkNotNull(serializationSchema); + this.metadataListener = checkNotNull(metadataListener); + checkNotNull(topicRoutingMode); + + // Create topic router supplier. + if (topicRoutingMode == TopicRoutingMode.CUSTOM) { + this.topicRouter = checkNotNull(topicRouter); + } else if (topicRoutingMode == TopicRoutingMode.ROUND_ROBIN) { + this.topicRouter = new RoundRobinTopicRouter<>(sinkConfiguration); + } else { + this.topicRouter = new KeyHashTopicRouter<>(sinkConfiguration); + } + } + + /** + * Create a {@link PulsarSinkBuilder} to construct a new {@link PulsarSink}. + * + * @param <IN> Type of incoming records. + * @return A Pulsar sink builder. + */ + public static <IN> PulsarSinkBuilder<IN> builder() { + return new PulsarSinkBuilder<>(); + } + + @Internal + @Override + public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext) { + return new PulsarWriter<>( + sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext); + } + + @Internal + @Override + public Committer<PulsarCommittable> createCommitter() { + return new PulsarCommitter(sinkConfiguration); + } + + @Internal + @Override + public SimpleVersionedSerializer<PulsarCommittable> getCommittableSerializer() { + return new PulsarCommittableSerializer(); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java new file mode 100644 index 0000000..a0352f5 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; + +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT; +import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link + * PulsarSink}. + * + * <p>The following example shows the minimum setup to create a PulsarSink that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSink<String> sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, and the record serializer are required fields that must be set. If + * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise, + * you must provide the topics to produce. + * + * <p>To specify the delivery guarantees of PulsarSink, one can call {@link + * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link + * DeliveryGuarantee#NONE}, and it wouldn't promise the consistence when write the message into + * Pulsar. + * + * <pre>{@code + * PulsarSink<String> sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .setDeliveryGuarantee(deliveryGuarantee) + * .build(); + * }</pre> + * + * @see PulsarSink for a more detailed explanation of the different guarantees. + * @param <IN> The input type of the sink. + */ +@PublicEvolving +public class PulsarSinkBuilder<IN> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class); + + private final PulsarConfigBuilder configBuilder; + + private PulsarSerializationSchema<IN> serializationSchema; + private TopicMetadataListener metadataListener; + private TopicRoutingMode topicRoutingMode; + private TopicRouter<IN> topicRouter; + + // private builder constructor. + PulsarSinkBuilder() { + this.configBuilder = new PulsarConfigBuilder(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSink. + * + * @param adminUrl The url for the PulsarAdmin. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) { + return setConfig(PULSAR_ADMIN_URL, adminUrl); + } + + /** + * Sets the server's link for the PulsarProducer of the PulsarSink. + * + * @param serviceUrl The server url of the Pulsar cluster. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setServiceUrl(String serviceUrl) { + return setConfig(PULSAR_SERVICE_URL, serviceUrl); + } + + /** + * The producer name is informative, and it can be used to identify a particular producer + * instance from the topic stats. + * + * @param producerName The name of the producer used in Pulsar sink. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setProducerName(String producerName) { + return setConfig(PULSAR_PRODUCER_NAME, producerName); + } + + /** + * Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this + * non-existed topic wouldn't throw any exception. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a pulsar topic list for flink sink. Some topic may not exist currently, consuming this + * non-existed topic wouldn't throw any exception. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setTopics(List<String> topics) { + checkState(metadataListener == null, "setTopics couldn't be set twice."); + // Making sure the topic should be distinct. + List<String> topicSet = distinctTopics(topics); + this.metadataListener = new TopicMetadataListener(topicSet); + return this; + } + + /** + * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link + * DeliveryGuarantee#NONE}. + * + * @param deliveryGuarantee Deliver guarantees. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + checkNotNull(deliveryGuarantee, "deliveryGuarantee"); + configBuilder.override(PULSAR_WRITE_DELIVERY_GUARANTEE, deliveryGuarantee); + return this; + } + + /** + * Set a routing mode for choosing right topic partition to send messages. + * + * @param topicRoutingMode Routing policy for choosing the desired topic. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setTopicRoutingMode(TopicRoutingMode topicRoutingMode) { + checkArgument( + topicRoutingMode != TopicRoutingMode.CUSTOM, + "CUSTOM mode should be set by using setTopicRouter method."); + this.topicRoutingMode = checkNotNull(topicRoutingMode, "topicRoutingMode"); + return this; + } + + /** + * Use a custom topic router instead predefine topic routing. + * + * @param topicRouter The router for choosing topic to send message. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setTopicRouter(TopicRouter<IN> topicRouter) { + if (topicRoutingMode != null && topicRoutingMode != TopicRoutingMode.CUSTOM) { + LOG.warn("We would override topicRoutingMode to CUSTOM if you provide TopicRouter."); + } + this.topicRoutingMode = TopicRoutingMode.CUSTOM; + this.topicRouter = checkNotNull(topicRouter, "topicRouter"); + return this; + } + + /** + * Sets the {@link PulsarSerializationSchema} that transforms incoming records to bytes. + * + * @param serializationSchema Pulsar specified serialize logic. + * @return this PulsarSinkBuilder. + */ + public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema( + PulsarSerializationSchema<T> serializationSchema) { + PulsarSinkBuilder<T> self = specialized(); + self.serializationSchema = serializationSchema; + return self; + } + + /** + * If you enable this option, we would consume and deserialize the message by using Pulsar + * {@link Schema}. + * + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> enableSchemaEvolution() { + configBuilder.override(PULSAR_WRITE_SCHEMA_EVOLUTION, true); + return this; + } + + /** + * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found + * in {@link PulsarSinkOptions} and {@link PulsarOptions}. + * + * <p>Make sure the option could be set only once or with same value. + * + * @param key The key of the property. + * @param value The value of the property. + * @return this PulsarSinkBuilder. + */ + public <T> PulsarSinkBuilder<IN> setConfig(ConfigOption<T> key, T value) { + configBuilder.set(key, value); + return this; + } + + /** + * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found + * in {@link PulsarSinkOptions} and {@link PulsarOptions}. + * + * @param config The config to set for the PulsarSink. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setConfig(Configuration config) { + configBuilder.set(config); + return this; + } + + /** + * Set arbitrary properties for the PulsarSink and Pulsar Producer. The valid keys can be found + * in {@link PulsarSinkOptions} and {@link PulsarOptions}. + * + * <p>This method is mainly used for future flink SQL binding. + * + * @param properties The config properties to set for the PulsarSink. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setProperties(Properties properties) { + configBuilder.set(properties); + return this; + } + + /** + * Build the {@link PulsarSink}. + * + * @return a PulsarSink with the settings made for this builder. + */ + public PulsarSink<IN> build() { + // Change delivery guarantee. + DeliveryGuarantee deliveryGuarantee = configBuilder.get(PULSAR_WRITE_DELIVERY_GUARANTEE); + if (deliveryGuarantee == DeliveryGuarantee.NONE) { + LOG.warn( + "You haven't set delivery guarantee or set it to NONE, this would cause data loss. Make sure you have known this shortcoming."); + } else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + LOG.info( + "Exactly once require flink checkpoint and your pulsar cluster should support the transaction."); + configBuilder.override(PULSAR_ENABLE_TRANSACTION, true); + configBuilder.override(PULSAR_SEND_TIMEOUT_MS, 0L); + + if (!configBuilder.contains(PULSAR_WRITE_TRANSACTION_TIMEOUT)) { + LOG.warn( + "The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval."); + } else { + Long timeout = configBuilder.get(PULSAR_WRITE_TRANSACTION_TIMEOUT); + LOG.warn( + "The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.", + timeout); + } + } + + if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) { + LOG.warn( + "We recommend set a readable producer name through setProducerName(String) in production mode."); + } + + checkNotNull(serializationSchema, "serializationSchema must be set."); + if (serializationSchema instanceof PulsarSchemaWrapper + && !Boolean.TRUE.equals(configBuilder.get(PULSAR_WRITE_SCHEMA_EVOLUTION))) { + LOG.info( + "It seems like you want to send message in Pulsar Schema." + + " You can enableSchemaEvolution for using this feature." + + " We would use Schema.BYTES as the default schema if you don't enable this option."); + } + + // Topic metadata listener validation. + if (metadataListener == null) { + if (topicRouter == null) { + throw new NullPointerException( + "No topic names or custom topic router are provided."); + } else { + LOG.warn( + "No topic set has been provided, make sure your custom topic router support empty topic set."); + this.metadataListener = new TopicMetadataListener(); + } + } + + // Topic routing mode validate. + if (topicRoutingMode == null) { + LOG.info("No topic routing mode has been chosen. We use round-robin mode as default."); + this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN; + } + + // This is an unmodifiable configuration for Pulsar. + // We don't use Pulsar's built-in configure classes for compatible requirement. + SinkConfiguration sinkConfiguration = + configBuilder.build(SINK_CONFIG_VALIDATOR, SinkConfiguration::new); + + return new PulsarSink<>( + sinkConfiguration, + serializationSchema, + metadataListener, + topicRoutingMode, + topicRouter); + } + + // ------------- private helpers -------------- + + /** Helper method for java compiler recognize the generic type. */ + @SuppressWarnings("unchecked") + private <T extends IN> PulsarSinkBuilder<T> specialized() { + return (PulsarSinkBuilder<T>) this; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java index 0e16830..3a7c5bc 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.description.Description; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash; import org.apache.pulsar.client.api.CompressionType; @@ -38,12 +39,13 @@ import static org.apache.flink.configuration.description.LinkElement.link; import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX; +import static org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash.MURMUR3_32_HASH; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; /** - * Configurations for PulsarSink. All the options list here could be configured in {@code + * Configurations for PulsarSink. All the options list here could be configured in {@link * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required * for pulsar source. * @@ -99,6 +101,13 @@ public final class PulsarSinkOptions { .withDescription( "Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes."); + public static final ConfigOption<MessageKeyHash> PULSAR_MESSAGE_KEY_HASH = + ConfigOptions.key(SINK_CONFIG_PREFIX + "messageKeyHash") + .enumType(MessageKeyHash.class) + .defaultValue(MURMUR3_32_HASH) + .withDescription( + "The hash policy for routing message by calculating the hash code of message key."); + public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION = ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution") .booleanType() @@ -106,7 +115,8 @@ public final class PulsarSinkOptions { .withDescription( Description.builder() .text( - "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.", + "If you enable this option and use PulsarSerializationSchema.pulsarSchema()," + + " we would consume and deserialize the message by using Pulsar's %s.", code("Schema")) .build()); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java new file mode 100644 index 0000000..cca8e80 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittable.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.committer; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.transaction.TxnID; + +import java.util.Objects; + +/** The writer state for Pulsar connector. We would used in Pulsar committer. */ +@Internal +public class PulsarCommittable { + + /** The transaction id. */ + private final TxnID txnID; + + /** The topic name with partition information. */ + private final String topic; + + public PulsarCommittable(TxnID txnID, String topic) { + this.txnID = txnID; + this.topic = topic; + } + + public TxnID getTxnID() { + return txnID; + } + + public String getTopic() { + return topic; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PulsarCommittable that = (PulsarCommittable) o; + return Objects.equals(txnID, that.txnID) && Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(txnID, topic); + } + + @Override + public String toString() { + return "PulsarCommittable{" + "txnID=" + txnID + ", topic='" + topic + '\'' + '}'; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java new file mode 100644 index 0000000..324a7c6 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommittableSerializer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.committer; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.pulsar.client.api.transaction.TxnID; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** A serializer used to serialize {@link PulsarCommittable}. */ +public class PulsarCommittableSerializer implements SimpleVersionedSerializer<PulsarCommittable> { + + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(PulsarCommittable obj) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + TxnID txnID = obj.getTxnID(); + out.writeLong(txnID.getMostSigBits()); + out.writeLong(txnID.getLeastSigBits()); + out.writeUTF(obj.getTopic()); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public PulsarCommittable deserialize(int version, byte[] serialized) throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + final DataInputStream in = new DataInputStream(bais)) { + long mostSigBits = in.readLong(); + long leastSigBits = in.readLong(); + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + String topic = in.readUTF(); + return new PulsarCommittable(txnID, topic); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java new file mode 100644 index 0000000..8389bdc --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.committer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils; +import org.apache.flink.connector.pulsar.sink.PulsarSink; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN; + +/** + * Committer implementation for {@link PulsarSink}. + * + * <p>The committer is responsible to finalize the Pulsar transactions by committing them. + */ +@Internal +public class PulsarCommitter implements Committer<PulsarCommittable>, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(PulsarCommitter.class); + + private final SinkConfiguration sinkConfiguration; + + private PulsarClient pulsarClient; + private TransactionCoordinatorClient coordinatorClient; + + public PulsarCommitter(SinkConfiguration sinkConfiguration) { + this.sinkConfiguration = checkNotNull(sinkConfiguration); + } + + @Override + public void commit(Collection<CommitRequest<PulsarCommittable>> requests) + throws IOException, InterruptedException { + TransactionCoordinatorClient client = transactionCoordinatorClient(); + + for (CommitRequest<PulsarCommittable> request : requests) { + PulsarCommittable committable = request.getCommittable(); + TxnID txnID = committable.getTxnID(); + String topic = committable.getTopic(); + + LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic); + try { + client.commit(txnID); + } catch (TransactionCoordinatorClientException e) { + // This is a known bug for Pulsar Transaction. + // We have to use instanceof instead of catching them. + TransactionCoordinatorClientException ex = PulsarTransactionUtils.unwrap(e); + if (ex instanceof CoordinatorNotFoundException) { + LOG.error( + "We couldn't find the Transaction Coordinator from Pulsar broker {}. " + + "Check your broker configuration.", + committable, + ex); + request.signalFailedWithKnownReason(ex); + } else if (ex instanceof InvalidTxnStatusException) { + LOG.error( + "Unable to commit transaction ({}) because it's in an invalid state. " + + "Most likely the transaction has been aborted for some reason. " + + "Please check the Pulsar broker logs for more details.", + committable, + ex); + request.signalAlreadyCommitted(); + } else if (ex instanceof TransactionNotFoundException) { + if (request.getNumberOfRetries() == 0) { + LOG.error( + "Unable to commit transaction ({}) because it's not found on Pulsar broker. " + + "Most likely the checkpoint interval exceed the transaction timeout.", + committable, + ex); + request.signalFailedWithKnownReason(ex); + } else { + LOG.warn( + "We can't find the transaction {} after {} retry committing. " + + "This may mean that the transaction have been committed in previous but failed with timeout. " + + "So we just mark it as committed.", + txnID, + request.getNumberOfRetries()); + request.signalAlreadyCommitted(); + } + } else if (ex instanceof MetaStoreHandlerNotExistsException) { + LOG.error( + "We can't find the meta store handler by the mostSigBits from TxnID {}. " + + "Did you change the metadata for topic {}?", + committable, + TRANSACTION_COORDINATOR_ASSIGN, + ex); + request.signalFailedWithKnownReason(ex); + } else { + LOG.error( + "Encountered retriable exception while committing transaction {} for topic {}.", + committable, + topic, + ex); + int maxRecommitTimes = sinkConfiguration.getMaxRecommitTimes(); + if (request.getNumberOfRetries() < maxRecommitTimes) { + request.retryLater(); + } else { + String message = + String.format( + "Failed to commit transaction %s after retrying %d times", + txnID, maxRecommitTimes); + request.signalFailedWithKnownReason(new FlinkRuntimeException(message, ex)); + } + } + } catch (Exception e) { + LOG.error( + "Transaction ({}) encountered unknown error and data could be potentially lost.", + committable, + e); + request.signalFailedWithUnknownReason(e); + } + } + } + + /** + * Lazy initialize this backend Pulsar client. This committer may not be used in {@link + * DeliveryGuarantee#NONE} and {@link DeliveryGuarantee#AT_LEAST_ONCE}. So we couldn't create + * the Pulsar client at first. + */ + private TransactionCoordinatorClient transactionCoordinatorClient() { + if (coordinatorClient == null) { + this.pulsarClient = createClient(sinkConfiguration); + this.coordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient(); + + // Ensure you have enabled transaction. + checkNotNull(coordinatorClient, "You haven't enable transaction in Pulsar client."); + } + + return coordinatorClient; + } + + @Override + public void close() throws IOException { + if (pulsarClient != null) { + pulsarClient.close(); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java index e0ef7ff..fe1204e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java @@ -23,6 +23,9 @@ import org.apache.flink.api.connector.sink.Sink.InitContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; +import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper; import org.apache.pulsar.client.api.Schema; @@ -31,6 +34,7 @@ import java.util.Objects; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; @@ -45,6 +49,7 @@ public class SinkConfiguration extends PulsarConfiguration { private final long transactionTimeoutMillis; private final long topicMetadataRefreshInterval; private final int partitionSwitchSize; + private final MessageKeyHash messageKeyHash; private final boolean enableSchemaEvolution; private final int maxPendingMessages; private final int maxRecommitTimes; @@ -56,12 +61,13 @@ public class SinkConfiguration extends PulsarConfiguration { this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT); this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL); this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES); + this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH); this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION); this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS); this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES); } - /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */ + /** The delivery guarantee changes the behavior of {@link PulsarWriter}. */ public DeliveryGuarantee getDeliveryGuarantee() { return deliveryGuarantee; } @@ -92,9 +98,14 @@ public class SinkConfiguration extends PulsarConfiguration { return partitionSwitchSize; } + /** The message key's hash logic for routing the message into one Pulsar partition. */ + public MessageKeyHash getMessageKeyHash() { + return messageKeyHash; + } + /** * If we should serialize and send the message with a specified Pulsar {@link Schema} instead - * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}. + * the default {@link Schema#BYTES}. This switch is only used for {@link PulsarSchemaWrapper}. */ public boolean isEnableSchemaEvolution() { return enableSchemaEvolution; @@ -129,6 +140,7 @@ public class SinkConfiguration extends PulsarConfiguration { && topicMetadataRefreshInterval == that.topicMetadataRefreshInterval && partitionSwitchSize == that.partitionSwitchSize && enableSchemaEvolution == that.enableSchemaEvolution + && messageKeyHash == that.messageKeyHash && maxPendingMessages == that.maxPendingMessages && maxRecommitTimes == that.maxRecommitTimes; } @@ -140,6 +152,7 @@ public class SinkConfiguration extends PulsarConfiguration { transactionTimeoutMillis, topicMetadataRefreshInterval, partitionSwitchSize, + messageKeyHash, enableSchemaEvolution, maxPendingMessages, maxRecommitTimes); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java new file mode 100644 index 0000000..9b3c931 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param <IN> The type of the input elements. + */ +@Internal +public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + + private final SinkConfiguration sinkConfiguration; + private final PulsarSerializationSchema<IN> serializationSchema; + private final TopicMetadataListener metadataListener; + private final TopicRouter<IN> topicRouter; + private final DeliveryGuarantee deliveryGuarantee; + private final PulsarSinkContext sinkContext; + private final MailboxExecutor mailboxExecutor; + private final TopicProducerRegister producerRegister; + + private long pendingMessages = 0; + + /** + * Constructor creating a Pulsar writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)} + * fails. + * + * @param sinkConfiguration The configuration to configure the Pulsar producer. + * @param serializationSchema Transform the incoming records into different message properties. + * @param metadataListener The listener for querying topic metadata. + * @param topicRouter Topic router to choose topic by incoming records. + * @param initContext Context to provide information about the runtime environment. + */ + public PulsarWriter( + SinkConfiguration sinkConfiguration, + PulsarSerializationSchema<IN> serializationSchema, + TopicMetadataListener metadataListener, + TopicRouter<IN> topicRouter, + InitContext initContext) { + this.sinkConfiguration = checkNotNull(sinkConfiguration); + this.serializationSchema = checkNotNull(serializationSchema); + this.metadataListener = checkNotNull(metadataListener); + this.topicRouter = checkNotNull(topicRouter); + checkNotNull(initContext); + + this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); + this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration); + this.mailboxExecutor = initContext.getMailboxExecutor(); + + // Initialize topic metadata listener. + LOG.debug("Initialize topic metadata after creating Pulsar writer."); + ProcessingTimeService timeService = initContext.getProcessingTimeService(); + this.metadataListener.open(sinkConfiguration, timeService); + + // Initialize topic router. + this.topicRouter.open(sinkConfiguration); + + // Initialize the serialization schema. + try { + InitializationContext initializationContext = + initContext.asSerializationSchemaInitializationContext(); + this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + // Create this producer register after opening serialization schema! + this.producerRegister = new TopicProducerRegister(sinkConfiguration); + } + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext); + + // Choose the right topic to send. + String key = message.getKey(); + List<String> availableTopics = metadataListener.availableTopics(); + String topic = topicRouter.route(element, key, availableTopics, sinkContext); + + // Create message builder for sending message. + TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message); + + // Perform message sending. + if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // We would just ignore the sending exception. This may cause data loss. + builder.sendAsync(); + } else { + // Waiting for permits to write message. + requirePermits(); + mailboxExecutor.execute( + () -> enqueueMessageSending(topic, builder), + "Failed to send message to Pulsar"); + } + } + + private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder) + throws ExecutionException, InterruptedException { + // Block the mailbox executor for yield method. + builder.sendAsync() + .whenComplete( + (id, ex) -> { + this.releasePermits(); + if (ex != null) { + throw new FlinkRuntimeException( + "Failed to send data to Pulsar " + topic, ex); + } else { + LOG.debug( + "Sent message to Pulsar {} with message id {}", topic, id); + } + }) + .get(); + } + + private void requirePermits() throws InterruptedException { + while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) { + LOG.info("Waiting for the available permits."); + mailboxExecutor.yield(); + } + pendingMessages++; + } + + private void releasePermits() { + this.pendingMessages -= 1; + } + + @SuppressWarnings("rawtypes") + private TypedMessageBuilder<?> createMessageBuilder( + String topic, Context context, PulsarMessage<?> message) { + + Schema<?> schema = message.getSchema(); + TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema); + + byte[] orderingKey = message.getOrderingKey(); + if (orderingKey != null && orderingKey.length > 0) { + builder.orderingKey(orderingKey); + } + + String key = message.getKey(); + if (!Strings.isNullOrEmpty(key)) { + builder.key(key); + } + + long eventTime = message.getEventTime(); + if (eventTime > 0) { + builder.eventTime(eventTime); + } else { + // Set default message timestamp if flink has provided one. + Long timestamp = context.timestamp(); + if (timestamp != null) { + builder.eventTime(timestamp); + } + } + + // Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder. + // The type has been checked in PulsarMessageBuilder#value. + ((TypedMessageBuilder) builder).value(message.getValue()); + + Map<String, String> properties = message.getProperties(); + if (properties != null && !properties.isEmpty()) { + builder.properties(properties); + } + + Long sequenceId = message.getSequenceId(); + if (sequenceId != null) { + builder.sequenceId(sequenceId); + } + + List<String> clusters = message.getReplicationClusters(); + if (clusters != null && !clusters.isEmpty()) { + builder.replicationClusters(clusters); + } + + if (message.isDisableReplication()) { + builder.disableReplication(); + } + + return builder; + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + if (endOfInput) { + // Try flush only once when we meet the end of the input. + producerRegister.flush(); + } else { + while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) { + producerRegister.flush(); + LOG.info("Flush the pending messages to Pulsar."); + mailboxExecutor.yield(); + } + } + } + + @Override + public Collection<PulsarCommittable> prepareCommit() { + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + return producerRegister.prepareCommit(); + } else { + return emptyList(); + } + } + + @Override + public void close() throws Exception { + // Close all the resources and throw the exception at last. + closeAll(metadataListener, producerRegister); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java new file mode 100644 index 0000000..5c93339 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.context; + +import org.apache.flink.annotation.PublicEvolving; + +/** This context provides information on the pulsar record target location. */ +@PublicEvolving +public interface PulsarSinkContext { + + /** + * Get the number of the subtask that PulsarSink is running on. The numbering starts from 0 and + * goes up to parallelism-1. (parallelism as returned by {@link #getNumberOfParallelInstances()} + * + * @return number of subtask + */ + int getParallelInstanceId(); + + /** @return number of parallel PulsarSink tasks. */ + int getNumberOfParallelInstances(); + + /** + * Pulsar can check the schema and upgrade the schema automatically. If you enable this option, + * we wouldn't serialize the record into bytes, we send and serialize it in the client. + */ + boolean isEnableSchemaEvolution(); + + /** Returns the current process time in flink. */ + long processTime(); +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java new file mode 100644 index 0000000..681b25a --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.context; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; + +/** An implementation that would contain all the required context. */ +@Internal +public class PulsarSinkContextImpl implements PulsarSinkContext { + + private final int numberOfParallelSubtasks; + private final int parallelInstanceId; + private final ProcessingTimeService processingTimeService; + private final boolean enableSchemaEvolution; + + public PulsarSinkContextImpl(InitContext initContext, SinkConfiguration sinkConfiguration) { + this.parallelInstanceId = initContext.getSubtaskId(); + this.numberOfParallelSubtasks = initContext.getNumberOfParallelSubtasks(); + this.processingTimeService = initContext.getProcessingTimeService(); + this.enableSchemaEvolution = sinkConfiguration.isEnableSchemaEvolution(); + } + + @Override + public int getParallelInstanceId() { + return parallelInstanceId; + } + + @Override + public int getNumberOfParallelInstances() { + return numberOfParallelSubtasks; + } + + @Override + public boolean isEnableSchemaEvolution() { + return enableSchemaEvolution; + } + + @Override + public long processTime() { + return processingTimeService.getCurrentProcessingTime(); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java new file mode 100644 index 0000000..433d79c --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/KeyHashTopicRouter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.router; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import org.apache.pulsar.client.impl.Hash; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.client.util.MathUtils.signSafeMod; + +/** + * If you choose the {@link TopicRoutingMode#MESSAGE_KEY_HASH} policy, we would use this + * implementation. We would pick the topic by the message key's hash code. If no message key was + * provided, we would randomly pick one. + * + * @param <IN> The message type which should write to Pulsar. + */ +@Internal +public class KeyHashTopicRouter<IN> implements TopicRouter<IN> { + private static final long serialVersionUID = 2475614648095079804L; + + private final MessageKeyHash messageKeyHash; + + public KeyHashTopicRouter(SinkConfiguration sinkConfiguration) { + this.messageKeyHash = sinkConfiguration.getMessageKeyHash(); + } + + @Override + public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) { + checkArgument( + !partitions.isEmpty(), + "You should provide topics for routing topic by message key hash."); + + int topicIndex; + if (Strings.isNullOrEmpty(key)) { + // We would randomly pick one topic to write. + topicIndex = ThreadLocalRandom.current().nextInt(partitions.size()); + } else { + // Hash the message key and choose the topic to write. + Hash hash = messageKeyHash.getHash(); + int code = hash.makeHash(key); + topicIndex = signSafeMod(code, partitions.size()); + } + + return partitions.get(topicIndex); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java new file mode 100644 index 0000000..7f35760 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.router; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; + +import org.apache.pulsar.client.impl.Hash; +import org.apache.pulsar.client.impl.JavaStringHash; +import org.apache.pulsar.client.impl.Murmur3_32Hash; + +import static org.apache.flink.configuration.description.LinkElement.link; +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** Predefined the available hash function for routing the message. */ +@PublicEvolving +public enum MessageKeyHash implements DescribedEnum { + + /** Use regular <code>String.hashCode()</code>. */ + JAVA_HASH( + "java-hash", + text( + "This hash would use %s to calculate the message key string's hash code.", + code("String.hashCode()"))) { + @Override + public Hash getHash() { + return JavaStringHash.getInstance(); + } + }, + /** + * Use Murmur3 hashing function. <a + * href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a> + */ + MURMUR3_32_HASH( + "murmur-3-32-hash", + text( + "This hash would calculate message key's hash code by using %s algorithm.", + link("https://en.wikipedia.org/wiki/MurmurHash", "Murmur3"))) { + @Override + public Hash getHash() { + return Murmur3_32Hash.getInstance(); + } + }; + + private final String name; + private final InlineElement desc; + + MessageKeyHash(String name, InlineElement desc) { + this.name = name; + this.desc = desc; + } + + @Internal + public abstract Hash getHash(); + + @Override + public String toString() { + return name; + } + + @Internal + @Override + public InlineElement getDescription() { + return desc; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java new file mode 100644 index 0000000..b9c654a --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/RoundRobinTopicRouter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.router; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument; + +/** + * If you choose the {@link TopicRoutingMode#ROUND_ROBIN} policy, we would use this implementation. + * We would pick the topic one by one in a fixed batch size. + * + * @param <IN> The message type which should write to Pulsar. + */ +@Internal +public class RoundRobinTopicRouter<IN> implements TopicRouter<IN> { + private static final long serialVersionUID = -1160533263474038206L; + + /** The internal counter for counting the messages. */ + private final AtomicLong counter = new AtomicLong(0); + + /** The size when we switch to another topic. */ + private final int partitionSwitchSize; + + public RoundRobinTopicRouter(SinkConfiguration configuration) { + this.partitionSwitchSize = configuration.getPartitionSwitchSize(); + } + + @Override + public String route(IN in, String key, List<String> partitions, PulsarSinkContext context) { + checkArgument( + !partitions.isEmpty(), + "You should provide topics for routing topic by message key hash."); + + long counts = counter.getAndAdd(1); + long index = (counts / partitionSwitchSize) % partitions.size(); + // Avoid digit overflow for message counter. + int topicIndex = (int) (Math.abs(index) % Integer.MAX_VALUE); + + return partitions.get(topicIndex); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java new file mode 100644 index 0000000..a2c0589 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRouter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.router; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; + +import java.io.Serializable; +import java.util.List; + +/** + * The router for choosing the desired topic to write the Flink records. The user can implement this + * router for complex requirements. We have provided some easy-to-use implementations. + * + * <p>This topic router is stateless and doesn't have any initialize logic. Make sure you don't + * require some dynamic state. + * + * @param <IN> The record type needs to be written to Pulsar. + */ +@PublicEvolving +public interface TopicRouter<IN> extends Serializable { + + /** + * Choose the topic by given record & available partition list. You can return a new topic name + * if you need it. + * + * @param in The record instance which need to be written to Pulsar. + * @param key The key of the message from {@link PulsarMessageBuilder#key(String)}. It could be + * null, if message doesn't have a key. + * @param partitions The available partition list. This could be empty if you don't provide any + * topics in {@link PulsarSinkBuilder#setTopics(String...)}. You can return a custom topic, + * but make sure it should contain a partition index in naming. Using {@link + * TopicNameUtils#topicNameWithPartition(String, int)} can easily create a topic name with + * partition index. + * @param context The context contains useful information for determining the topic. + * @return The topic name to use. + */ + String route(IN in, String key, List<String> partitions, PulsarSinkContext context); + + /** Implement this method if you have some non-serializable field. */ + default void open(SinkConfiguration sinkConfiguration) { + // Nothing to do by default. + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java new file mode 100644 index 0000000..c327435 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.router; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES; + +/** The routing policy for choosing the desired topic by the given message. */ +@PublicEvolving +public enum TopicRoutingMode implements DescribedEnum { + + /** + * The producer will publish messages across all partitions in a round-robin fashion to achieve + * maximum throughput. Please note that round-robin is not done per individual message but + * rather it's set to the same boundary of batching delay, to ensure batching is effective. + */ + ROUND_ROBIN( + "round-robin", + text( + "The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput." + + " Please note that round-robin is not done per individual message" + + " but rather it's set to the same boundary of %s, to ensure batching is effective.", + code(PULSAR_BATCHING_MAX_MESSAGES.key()))), + + /** + * If no key is provided, The partitioned producer will randomly pick one single topic partition + * and publish all the messages into that partition. If a key is provided on the message, the + * partitioned producer will hash the key and assign the message to a particular partition. + */ + MESSAGE_KEY_HASH( + "message-key-hash", + text( + "If no key is provided, The partitioned producer will randomly pick one single topic partition" + + " and publish all the messages into that partition. If a key is provided on the message," + + " the partitioned producer will hash the key and assign the message to a particular partition.")), + + /** + * Use custom topic router implementation that will be called to determine the partition for a + * particular message. + */ + CUSTOM( + "custom", + text( + "Use custom %s implementation that will be called to determine the partition for a particular message.", + code(TopicRouter.class.getSimpleName()))); + + private final String name; + private final InlineElement desc; + + TopicRoutingMode(String name, InlineElement desc) { + this.name = name; + this.desc = desc; + } + + @Internal + @Override + public InlineElement getDescription() { + return desc; + } + + @Override + public String toString() { + return name; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java new file mode 100644 index 0000000..acd1c61 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; + +import org.apache.flink.shaded.guava30.com.google.common.base.Objects; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyList; +import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; + +/** + * We need the latest topic metadata for making sure the newly created topic partitions would be + * used by the Pulsar sink. This routing policy would be different compared with Pulsar Client + * built-in logic. We use Flink's ProcessingTimer as the executor. + */ +@Internal +public class TopicMetadataListener implements Serializable, Closeable { + private static final long serialVersionUID = 6186948471557507522L; + + private static final Logger LOG = LoggerFactory.getLogger(TopicMetadataListener.class); + + private final ImmutableList<String> partitionedTopics; + private final Map<String, Integer> topicMetadata; + private volatile ImmutableList<String> availableTopics; + + // Dynamic fields. + private transient PulsarAdmin pulsarAdmin; + private transient Long topicMetadataRefreshInterval; + private transient ProcessingTimeService timeService; + + public TopicMetadataListener() { + this(emptyList()); + } + + public TopicMetadataListener(List<String> topics) { + List<String> partitions = new ArrayList<>(topics.size()); + Map<String, Integer> metadata = new HashMap<>(topics.size()); + for (String topic : topics) { + if (isPartitioned(topic)) { + partitions.add(topic); + } else { + // This would be updated when open writing. + metadata.put(topic, -1); + } + } + + this.partitionedTopics = ImmutableList.copyOf(partitions); + this.topicMetadata = metadata; + this.availableTopics = ImmutableList.of(); + } + + /** Register the topic metadata update in process time service. */ + public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) { + if (topicMetadata.isEmpty()) { + LOG.info("No topics have been provided, skip listener initialize."); + return; + } + + // Initialize listener properties. + this.pulsarAdmin = createAdmin(sinkConfiguration); + this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval(); + this.timeService = timeService; + + // Initialize the topic metadata. Quit if fail to connect to Pulsar. + sneakyAdmin(this::updateTopicMetadata); + + // Register time service. + triggerNextTopicMetadataUpdate(true); + } + + /** + * Return all the available topic partitions. We would recalculate the partitions if the topic + * metadata has been changed. Otherwise, we would return the cached result for better + * performance. + */ + public List<String> availableTopics() { + if (availableTopics.isEmpty() + && (!partitionedTopics.isEmpty() || !topicMetadata.isEmpty())) { + List<String> results = new ArrayList<>(); + for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) { + for (int i = 0; i < entry.getValue(); i++) { + results.add(topicNameWithPartition(entry.getKey(), i)); + } + } + + results.addAll(partitionedTopics); + this.availableTopics = ImmutableList.copyOf(results); + } + + return availableTopics; + } + + @Override + public void close() throws IOException { + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + } + + private void triggerNextTopicMetadataUpdate(boolean initial) { + if (!initial) { + // We should update the topic metadata, ignore the pulsar admin exception. + try { + updateTopicMetadata(); + } catch (PulsarAdminException e) { + LOG.warn("", e); + } + } + + // Register next timer. + long currentProcessingTime = timeService.getCurrentProcessingTime(); + long triggerTime = currentProcessingTime + topicMetadataRefreshInterval; + timeService.registerTimer(triggerTime, time -> triggerNextTopicMetadataUpdate(false)); + } + + private void updateTopicMetadata() throws PulsarAdminException { + boolean shouldUpdate = false; + + for (Map.Entry<String, Integer> entry : topicMetadata.entrySet()) { + String topic = entry.getKey(); + PartitionedTopicMetadata metadata = + pulsarAdmin.topics().getPartitionedTopicMetadata(topic); + + // Update topic metadata if it has been changed. + if (!Objects.equal(entry.getValue(), metadata.partitions)) { + entry.setValue(metadata.partitions); + shouldUpdate = true; + } + } + + // Clear available topics if the topic metadata has been changed. + if (shouldUpdate) { + this.availableTopics = ImmutableList.of(); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java new file mode 100644 index 0000000..9bb1753 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.schema.SchemaInfo; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; +import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction; +import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * All the Pulsar Producers share the same Client, but self hold the queue for a specified topic. So + * we have to create different instances for different topics. + */ +@Internal +public class TopicProducerRegister implements Closeable { + + private final PulsarClient pulsarClient; + private final SinkConfiguration sinkConfiguration; + private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister; + private final Map<String, Transaction> transactionRegister; + + public TopicProducerRegister(SinkConfiguration sinkConfiguration) { + this.pulsarClient = createClient(sinkConfiguration); + this.sinkConfiguration = sinkConfiguration; + this.producerRegister = new HashMap<>(); + this.transactionRegister = new HashMap<>(); + } + + /** + * Create a TypedMessageBuilder which could be sent to Pulsar directly. First, we would create a + * topic-related producer or use a cached instead. Then we would try to find a topic-related + * transaction. We would generate a transaction instance if there is no transaction. Finally, we + * create the message builder and put the element into it. + */ + public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, Schema<T> schema) { + Producer<T> producer = getOrCreateProducer(topic, schema); + DeliveryGuarantee deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); + + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + Transaction transaction = getOrCreateTransaction(topic); + return producer.newMessage(transaction); + } else { + return producer.newMessage(); + } + } + + /** + * Convert the transactions into a committable list for Pulsar Committer. The transactions would + * be removed until Flink triggered a checkpoint. + */ + public List<PulsarCommittable> prepareCommit() { + List<PulsarCommittable> committables = new ArrayList<>(transactionRegister.size()); + transactionRegister.forEach( + (topic, transaction) -> { + TxnID txnID = transaction.getTxnID(); + PulsarCommittable committable = new PulsarCommittable(txnID, topic); + committables.add(committable); + }); + + clearTransactions(); + return committables; + } + + /** + * Flush all the messages buffered in the client and wait until all messages have been + * successfully persisted. + */ + public void flush() throws IOException { + Collection<Map<SchemaInfo, Producer<?>>> collection = producerRegister.values(); + for (Map<SchemaInfo, Producer<?>> producers : collection) { + for (Producer<?> producer : producers.values()) { + producer.flush(); + } + } + } + + @Override + public void close() throws IOException { + try (Closer closer = Closer.create()) { + // Flush all the pending messages to Pulsar. This wouldn't cause exception. + closer.register(this::flush); + + // Abort all the existed transactions. + closer.register(this::abortTransactions); + + // Remove all the producers. + closer.register(producerRegister::clear); + + // All the producers would be closed by this method. + // We would block until all the producers have been successfully closed. + closer.register(pulsarClient); + } + } + + /** Create or return the cached topic-related producer. */ + @SuppressWarnings("unchecked") + private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) { + Map<SchemaInfo, Producer<?>> producers = + producerRegister.computeIfAbsent(topic, key -> new HashMap<>()); + SchemaInfo schemaInfo = schema.getSchemaInfo(); + + if (producers.containsKey(schemaInfo)) { + return (Producer<T>) producers.get(schemaInfo); + } else { + ProducerBuilder<T> builder = + createProducerBuilder(pulsarClient, schema, sinkConfiguration); + // Set the required topic name. + builder.topic(topic); + Producer<T> producer = sneakyClient(builder::create); + producers.put(schemaInfo, producer); + + return producer; + } + } + + /** + * Get the cached topic-related transaction. Or create a new transaction after checkpointing. + */ + private Transaction getOrCreateTransaction(String topic) { + return transactionRegister.computeIfAbsent( + topic, + t -> { + long timeoutMillis = sinkConfiguration.getTransactionTimeoutMillis(); + return createTransaction(pulsarClient, timeoutMillis); + }); + } + + /** Abort the existed transactions. This method would be used when closing PulsarWriter. */ + private void abortTransactions() { + if (transactionRegister.isEmpty()) { + return; + } + + TransactionCoordinatorClient coordinatorClient = + ((PulsarClientImpl) pulsarClient).getTcClient(); + // This null check is used for making sure transaction is enabled in client. + checkNotNull(coordinatorClient); + + try (Closer closer = Closer.create()) { + for (Transaction transaction : transactionRegister.values()) { + TxnID txnID = transaction.getTxnID(); + closer.register(() -> coordinatorClient.abort(txnID)); + } + + clearTransactions(); + } catch (IOException e) { + throw new FlinkRuntimeException(e); + } + } + + /** + * Clean these transactions. All transactions should be passed to Pulsar committer, we would + * create new transaction when new message comes. + */ + private void clearTransactions() { + transactionRegister.clear(); + } +}