[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136427#comment-16136427 ]
ASF GitHub Bot commented on FLINK-6988: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134399048 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer. + * + * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is + * expected to call: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between + * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume + * interrupted transaction and commit if after a restart: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#getProducerId()}</li> + * <li>{@link FlinkKafkaProducer#getEpoch()}</li> + * <li>node failure... restore producerId and epoch from state</li> + * <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()} + * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions. + * + * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer} + * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on + * {@link FlinkKafkaProducer#commitTransaction()}. + * + * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via + * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately + * private fields). + * + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's + * API authors to make them possible. + * + * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements + * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to + * re-implement whole Kafka's 0.11 REST API client on our own. + */ +public class FlinkKafkaProducer<K, V> implements Producer<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); + + private final KafkaProducer<K, V> kafkaProducer; + @Nullable --- End diff -- nit: empty line before this field annotation. > Add Apache Kafka 0.11 connector > ------------------------------- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)