imaffe commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r800452443



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicRouter<IN> topicRouter;
+    private final PulsarSinkContextAdapter sinkContextAdapter;
+    private final TopicMetadataListener metadataListener;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+    private final Semaphore pendingMessages;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+     * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+     * @param metadataListener the listener for querying topic metadata.
+     * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+     * @param initContext context to provide information about the runtime 
environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider,
+            InitContext initContext) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.serializationSchema = serializationSchema;
+        this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+        this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, 
sinkConfiguration);
+        this.metadataListener = metadataListener;
+        this.mailboxExecutor = initContext.getMailboxExecutor();
+
+        // Initialize topic metadata listener.
+        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+        ProcessingTimeService timeService = 
initContext.getProcessingTimeService();
+        metadataListener.open(sinkConfiguration, timeService);
+
+        // Initialize topic router.
+        topicRouter.open(sinkConfiguration);
+
+        // Initialize the serialization schema.
+        try {
+            InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, 
sinkContextAdapter, sinkConfiguration);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        // Create this producer register after opening serialization schema!
+        this.producerRegister = new TopicProducerRegister(sinkConfiguration, 
serializationSchema);
+        this.pendingMessages = new 
Semaphore(sinkConfiguration.getMaxPendingMessages());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // Serialize the incoming element.
+        sinkContextAdapter.updateTimestamp(context);
+        RawMessage<byte[]> message = serializationSchema.serialize(element, 
sinkContextAdapter);
+
+        // Choose the right topic to send.
+        List<String> availableTopics = metadataListener.availableTopics();
+        String topic = topicRouter.route(element, message, availableTopics, 
sinkContextAdapter);
+
+        // Create message builder for sending message.
+        TypedMessageBuilder<?> builder = createMessageBuilder(topic, 
deliveryGuarantee);

Review comment:
       I think here the situation is similar to what we discussed in source 
connector. Basically if the messages in pending transactions are sent to Pulsar 
broker already (only that they are not visible to Pulsar consumers yet). If the 
job fails, then we lose handle to all the pending transactions. If we restart 
from the last checkpoint, what we can do is recreate a bunch of new 
transactions and resend the messages.  The only downside of this approach is 
the pending messages of the previous pending transactions in Pulsar broker will 
not be cleared until timeout.  So in this case the current implementation is 
safe as we are not attemping to reuse the same transactions across 
failures/checkpoints.
   
   However this can raise another discussion. We previously discussed with the 
Pulsar transaction developers, and we find that one approach is that for each 
checkpoint, we will first create transactions that will be used until next 
checkpoint, and then store these TxnIds in the current checkpoint. In this 
case, lets say we have two checkpoint c1 and c2, any messages sent between c1 
and c2 will use transaction id, let's say txnId1, and this txnId1 is stored in 
c1. Everytime we restart from last checkpoint, we can actually reuse the 
transactions. This way the pending messages stored in Pulsar broker will not be 
wasted. 
   
   There is an edge case, so if after a restart and we get the txnId1, but this 
txnId1 can be timeout already (though we can set the timeout to a very large 
number to avoid this, but let's assume the timeout can really happen for now), 
in this case we can create a new transaction like the current implementation. 
   
   So to summarize
   1. current implementation is correct, the only concern is it might give 
pulsar broker extra burden (like the pending messages could prevent pulsar 
broker clear the log)
   2. the proposed alternative is to alleviate the possible extra burden, and 
it fall back to the current implementation if the timeout still happen. The 
downside is it introduces state to our sink writer. 
   
   I personally feel like. the current implementation is already good to go, 
the alternative can be discussed further to decide whether it is really needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to