imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r800452443
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param <IN> The type of the input elements. + */ +public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + + private final SinkConfiguration sinkConfiguration; + private final DeliveryGuarantee deliveryGuarantee; + private final PulsarSerializationSchema<IN> serializationSchema; + private final TopicRouter<IN> topicRouter; + private final PulsarSinkContextAdapter sinkContextAdapter; + private final TopicMetadataListener metadataListener; + private final MailboxExecutor mailboxExecutor; + private final TopicProducerRegister producerRegister; + private final Semaphore pendingMessages; + + /** + * Constructor creating a Pulsar writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)} + * fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param serializationSchema serialize to transform the incoming records to {@link RawMessage}. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouterProvider create related topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ + public PulsarWriter( + SinkConfiguration sinkConfiguration, + PulsarSerializationSchema<IN> serializationSchema, + TopicMetadataListener metadataListener, + SerializableFunction<SinkConfiguration, TopicRouter<IN>> topicRouterProvider, + InitContext initContext) { + this.sinkConfiguration = sinkConfiguration; + this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); + this.serializationSchema = serializationSchema; + this.topicRouter = topicRouterProvider.apply(sinkConfiguration); + this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, sinkConfiguration); + this.metadataListener = metadataListener; + this.mailboxExecutor = initContext.getMailboxExecutor(); + + // Initialize topic metadata listener. + LOG.debug("Initialize topic metadata after creating Pulsar writer."); + ProcessingTimeService timeService = initContext.getProcessingTimeService(); + metadataListener.open(sinkConfiguration, timeService); + + // Initialize topic router. + topicRouter.open(sinkConfiguration); + + // Initialize the serialization schema. + try { + InitializationContext initializationContext = + initContext.asSerializationSchemaInitializationContext(); + serializationSchema.open(initializationContext, sinkContextAdapter, sinkConfiguration); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + // Create this producer register after opening serialization schema! + this.producerRegister = new TopicProducerRegister(sinkConfiguration, serializationSchema); + this.pendingMessages = new Semaphore(sinkConfiguration.getMaxPendingMessages()); + } + + @Override + @SuppressWarnings("unchecked") + public void write(IN element, Context context) throws IOException, InterruptedException { + // Serialize the incoming element. + sinkContextAdapter.updateTimestamp(context); + RawMessage<byte[]> message = serializationSchema.serialize(element, sinkContextAdapter); + + // Choose the right topic to send. + List<String> availableTopics = metadataListener.availableTopics(); + String topic = topicRouter.route(element, message, availableTopics, sinkContextAdapter); + + // Create message builder for sending message. + TypedMessageBuilder<?> builder = createMessageBuilder(topic, deliveryGuarantee); Review comment: I think here the situation is similar to what we discussed in source connector. Basically if the messages in pending transactions are sent to Pulsar broker already (only that they are not visible to Pulsar consumers yet). If the job fails, then we lose handle to all the pending transactions. If we restart from the last checkpoint, what we can do is recreate a bunch of new transactions and resend the messages. The only downside of this approach is the pending messages of the previous pending transactions in Pulsar broker will not be cleared until timeout. So in this case the current implementation is safe as we are not attemping to reuse the same transactions across failures/checkpoints. However this can raise another discussion. We previously discussed with the Pulsar transaction developers, and we find that one approach is that for each checkpoint, we will first create transactions that will be used until next checkpoint, and then store these TxnIds in the current checkpoint. In this case, lets say we have two checkpoint c1 and c2, any messages sent between c1 and c2 will use transaction id, let's say txnId1, and this txnId1 is stored in c1. Everytime we restart from last checkpoint, we can actually reuse the transactions. This way the pending messages stored in Pulsar broker will not be wasted. There is an edge case, so if after a restart and we get the txnId1, but this txnId1 can be timeout already (though we can set the timeout to a very large number to avoid this, but let's assume the timeout can really happen for now), in this case we can create a new transaction like the current implementation. So to summarize 1. current implementation is correct, the only concern is it might give pulsar broker extra burden (like the pending messages could prevent pulsar broker clear the log) 2. the proposed alternative is to alleviate the possible extra burden, and it fall back to the current implementation if the timeout still happen. The downside is it introduces state to our sink writer. I personally feel like. the current implementation is already good to go, the alternative can be discussed further to decide whether it is really needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org