JingGe commented on a change in pull request #18612:
URL: https://github.com/apache/flink/pull/18612#discussion_r798580046



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -65,7 +69,9 @@
  *
  * @param <IN> The type of the input elements.
  */
-class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, 
KafkaWriterState> {
+class KafkaWriter<IN>

Review comment:
       It seems that `StatefulSinkWriter` and `PrecommittingSinkWriter` could 
also be @Internal, if the implementation class, e.g. KafkaWriter is always 
package-private.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
##########
@@ -86,45 +85,48 @@
         return new KafkaSinkBuilder<>();
     }
 
+    @Internal

Review comment:
       1. What use @Internal? User should have no access to the implementation 
class.
   2. Are those @Internal @Override methods coming from the interface 
`StatefulSink` and `TwoPhaseCommittingSink` designed to always be used 
internallly which means every further connector impl, e.g. elastic, hbase, etc. 
must  mark them as @Internal too? Is it possible to mark them as @Internal at 
the interface level to save the connector developer's effort and avoid 
potential human mistakes.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -185,15 +191,20 @@ public void write(IN element, Context context) throws 
IOException {
     }
 
     @Override
-    public List<KafkaCommittable> prepareCommit(boolean flush) {
-        if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) {
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        LOG.debug("final commit={}", endOfInput);
+        if (deliveryGuarantee != DeliveryGuarantee.NONE || endOfInput) {

Review comment:
       ```suggestion
           if (deliveryGuarantee != DeliveryGuarantee.NONE || endOfInput) {
                   LOG.debug("final commit={}", endOfInput);
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
##########
@@ -57,7 +54,9 @@
  * @see KafkaSinkBuilder on how to construct a KafkaSink
  */
 @PublicEvolving
-public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, 
KafkaWriterState, Void> {
+public class KafkaSink<IN>

Review comment:
       Does package-private work here too?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/InitContextInitializationContextAdapter.java
##########
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.sink;
+package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;

Review comment:
       What is the reason of this change?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
##########
@@ -86,45 +85,48 @@
         return new KafkaSinkBuilder<>();
     }
 
+    @Internal
     @Override
-    public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(
-            InitContext context, List<KafkaWriterState> states) throws 
IOException {
-        final Supplier<MetricGroup> metricGroupSupplier =
-                () -> context.metricGroup().addGroup("user");
-        return new KafkaWriter<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context,
-                recordSerializer,
-                new InitContextInitializationContextAdapter(
-                        context.getUserCodeClassLoader(), metricGroupSupplier),
-                states);
+    public Committer<KafkaCommittable> createCommitter() throws IOException {
+        return new KafkaCommitter(kafkaProducerConfig);
     }
 
+    @Internal
     @Override
-    public Optional<Committer<KafkaCommittable>> createCommitter() throws 
IOException {
-        return Optional.of(new KafkaCommitter(kafkaProducerConfig));
+    public SimpleVersionedSerializer<KafkaCommittable> 
getCommittableSerializer() {
+        return new KafkaCommittableSerializer();

Review comment:
       Should the get always return a new instance?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
##########
@@ -86,45 +85,48 @@
         return new KafkaSinkBuilder<>();
     }
 
+    @Internal
     @Override
-    public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(
-            InitContext context, List<KafkaWriterState> states) throws 
IOException {
-        final Supplier<MetricGroup> metricGroupSupplier =
-                () -> context.metricGroup().addGroup("user");
-        return new KafkaWriter<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context,
-                recordSerializer,
-                new InitContextInitializationContextAdapter(
-                        context.getUserCodeClassLoader(), metricGroupSupplier),
-                states);
+    public Committer<KafkaCommittable> createCommitter() throws IOException {
+        return new KafkaCommitter(kafkaProducerConfig);
     }
 
+    @Internal
     @Override
-    public Optional<Committer<KafkaCommittable>> createCommitter() throws 
IOException {
-        return Optional.of(new KafkaCommitter(kafkaProducerConfig));
+    public SimpleVersionedSerializer<KafkaCommittable> 
getCommittableSerializer() {
+        return new KafkaCommittableSerializer();
     }
 
+    @Internal
     @Override
-    public Optional<GlobalCommitter<KafkaCommittable, Void>> 
createGlobalCommitter()
-            throws IOException {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<KafkaCommittable>> 
getCommittableSerializer() {
-        return Optional.of(new KafkaCommittableSerializer());
+    public KafkaWriter<IN> createWriter(InitContext context) throws 
IOException {
+        return new KafkaWriter<IN>(
+                deliveryGuarantee,
+                kafkaProducerConfig,
+                transactionalIdPrefix,
+                context,
+                recordSerializer,
+                context.asSerializationSchemaInitializationContext(),
+                Collections.emptyList());
     }
 
+    @Internal
     @Override
-    public Optional<SimpleVersionedSerializer<Void>> 
getGlobalCommittableSerializer() {
-        return Optional.empty();
+    public KafkaWriter<IN> restoreWriter(
+            InitContext context, Collection<KafkaWriterState> recoveredState) 
throws IOException {
+        return new KafkaWriter<>(
+                deliveryGuarantee,
+                kafkaProducerConfig,
+                transactionalIdPrefix,
+                context,
+                recordSerializer,
+                context.asSerializationSchemaInitializationContext(),
+                recoveredState);
     }
 
+    @Internal
     @Override
-    public Optional<SimpleVersionedSerializer<KafkaWriterState>> 
getWriterStateSerializer() {
-        return Optional.of(new KafkaWriterStateSerializer());
+    public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
+        return new KafkaWriterStateSerializer();

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to