JingGe commented on a change in pull request #18612: URL: https://github.com/apache/flink/pull/18612#discussion_r798580046
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -65,7 +69,9 @@ * * @param <IN> The type of the input elements. */ -class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> { +class KafkaWriter<IN> Review comment: It seems that `StatefulSinkWriter` and `PrecommittingSinkWriter` could also be @Internal, if the implementation class, e.g. KafkaWriter is always package-private. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java ########## @@ -86,45 +85,48 @@ return new KafkaSinkBuilder<>(); } + @Internal Review comment: 1. What use @Internal? User should have no access to the implementation class. 2. Are those @Internal @Override methods coming from the interface `StatefulSink` and `TwoPhaseCommittingSink` designed to always be used internallly which means every further connector impl, e.g. elastic, hbase, etc. must mark them as @Internal too? Is it possible to mark them as @Internal at the interface level to save the connector developer's effort and avoid potential human mistakes. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -185,15 +191,20 @@ public void write(IN element, Context context) throws IOException { } @Override - public List<KafkaCommittable> prepareCommit(boolean flush) { - if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) { + public void flush(boolean endOfInput) throws IOException, InterruptedException { + LOG.debug("final commit={}", endOfInput); + if (deliveryGuarantee != DeliveryGuarantee.NONE || endOfInput) { Review comment: ```suggestion if (deliveryGuarantee != DeliveryGuarantee.NONE || endOfInput) { LOG.debug("final commit={}", endOfInput); ``` ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java ########## @@ -57,7 +54,9 @@ * @see KafkaSinkBuilder on how to construct a KafkaSink */ @PublicEvolving -public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { +public class KafkaSink<IN> Review comment: Does package-private work here too? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/InitContextInitializationContextAdapter.java ########## @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.flink.api.connector.sink; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Sink; Review comment: What is the reason of this change? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java ########## @@ -86,45 +85,48 @@ return new KafkaSinkBuilder<>(); } + @Internal @Override - public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( - InitContext context, List<KafkaWriterState> states) throws IOException { - final Supplier<MetricGroup> metricGroupSupplier = - () -> context.metricGroup().addGroup("user"); - return new KafkaWriter<>( - deliveryGuarantee, - kafkaProducerConfig, - transactionalIdPrefix, - context, - recordSerializer, - new InitContextInitializationContextAdapter( - context.getUserCodeClassLoader(), metricGroupSupplier), - states); + public Committer<KafkaCommittable> createCommitter() throws IOException { + return new KafkaCommitter(kafkaProducerConfig); } + @Internal @Override - public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { - return Optional.of(new KafkaCommitter(kafkaProducerConfig)); + public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() { + return new KafkaCommittableSerializer(); Review comment: Should the get always return a new instance? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java ########## @@ -86,45 +85,48 @@ return new KafkaSinkBuilder<>(); } + @Internal @Override - public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter( - InitContext context, List<KafkaWriterState> states) throws IOException { - final Supplier<MetricGroup> metricGroupSupplier = - () -> context.metricGroup().addGroup("user"); - return new KafkaWriter<>( - deliveryGuarantee, - kafkaProducerConfig, - transactionalIdPrefix, - context, - recordSerializer, - new InitContextInitializationContextAdapter( - context.getUserCodeClassLoader(), metricGroupSupplier), - states); + public Committer<KafkaCommittable> createCommitter() throws IOException { + return new KafkaCommitter(kafkaProducerConfig); } + @Internal @Override - public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException { - return Optional.of(new KafkaCommitter(kafkaProducerConfig)); + public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() { + return new KafkaCommittableSerializer(); } + @Internal @Override - public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter() - throws IOException { - return Optional.empty(); - } - - @Override - public Optional<SimpleVersionedSerializer<KafkaCommittable>> getCommittableSerializer() { - return Optional.of(new KafkaCommittableSerializer()); + public KafkaWriter<IN> createWriter(InitContext context) throws IOException { + return new KafkaWriter<IN>( + deliveryGuarantee, + kafkaProducerConfig, + transactionalIdPrefix, + context, + recordSerializer, + context.asSerializationSchemaInitializationContext(), + Collections.emptyList()); } + @Internal @Override - public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { - return Optional.empty(); + public KafkaWriter<IN> restoreWriter( + InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException { + return new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + transactionalIdPrefix, + context, + recordSerializer, + context.asSerializationSchemaInitializationContext(), + recoveredState); } + @Internal @Override - public Optional<SimpleVersionedSerializer<KafkaWriterState>> getWriterStateSerializer() { - return Optional.of(new KafkaWriterStateSerializer()); + public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() { + return new KafkaWriterStateSerializer(); Review comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org