leonardBang commented on code in PR #4053:
URL: https://github.com/apache/flink-cdc/pull/4053#discussion_r2189288420


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRecordSerializer.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+import com.alibaba.fluss.client.Connection;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Serializer to serialize the input record to a {@link FlussEvent} for {@link 
FlinkSinkWriter}.
+ *
+ * @param <INPUT>
+ */
+public interface FlussRecordSerializer<INPUT> extends Serializable {
+    void open(Connection connection) throws IOException;
+
+    FlussEvent serialize(INPUT t) throws IOException;

Review Comment:
   hint: `INPUT in` or `T t` for generic type



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/RowWithOp.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/* This file is based on source code of Apache Fluss Project 
(https://fluss.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * A wrapper class that associates an {@link InternalRow} with an {@link 
OperationType} for use in
+ * Fluss-Flink data processing.
+ *
+ * <p>This class is used to represent a row of data along with its 
corresponding operation type,
+ * such as APPEND, UPSERT, or DELETE, as defined by {@link OperationType}.
+ *
+ * @see InternalRow
+ * @see OperationType
+ */
+public class RowWithOp {

Review Comment:
   we can rename to `FlussRowWithOp ` to avoid misleading?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussConfigUtils.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for parse fluss yaml sink options. */

Review Comment:
   hint: `for parsing` or `used to parse`



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRecordSerializer.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+import com.alibaba.fluss.client.Connection;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Serializer to serialize the input record to a {@link FlussEvent} for {@link 
FlinkSinkWriter}.
+ *
+ * @param <INPUT>

Review Comment:
   plz add note for parameters



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlinkSinkWriter.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import 
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics.FlinkMetricRegistry;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.table.api.ValidationException;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metrics.Gauge;
+import com.alibaba.fluss.metrics.Metric;
+import com.alibaba.fluss.metrics.MetricNames;
+import com.alibaba.fluss.row.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Base class for Flink {@link SinkWriter} implementations in Fluss. */
+public class FlinkSinkWriter<InputT> implements SinkWriter<InputT> {

Review Comment:
   hint: `FlussSinkWriter`



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlinkConversions.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.utils;
+
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.util.CollectionUtil;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableDescriptor;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Converter from Flink's type to Fluss's type. */
+public class FlinkConversions {

Review Comment:
   minor:FlussConversions



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlinkSinkWriter.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import 
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics.FlinkMetricRegistry;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.table.api.ValidationException;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metrics.Gauge;
+import com.alibaba.fluss.metrics.Metric;
+import com.alibaba.fluss.metrics.MetricNames;
+import com.alibaba.fluss.row.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Base class for Flink {@link SinkWriter} implementations in Fluss. */
+public class FlinkSinkWriter<InputT> implements SinkWriter<InputT> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkSinkWriter.class);
+
+    private final Configuration flussConfig;
+    private final MailboxExecutor mailboxExecutor;
+    private final FlussRecordSerializer<InputT> flussRecordSerializer;
+
+    private transient Connection connection;
+    protected transient FlinkMetricRegistry flinkMetricRegistry;
+
+    protected transient SinkWriterMetricGroup metricGroup;
+
+    private transient Counter numRecordsOutCounter;
+    private transient Counter numRecordsOutErrorsCounter;
+    private volatile Throwable asyncWriterException;
+
+    private final Map<TablePath, TableWriter> writerMap;
+    private final Map<TablePath, Table> tableMap;
+
+    public FlinkSinkWriter(
+            Configuration flussConfig,
+            MailboxExecutor mailboxExecutor,
+            FlussRecordSerializer<InputT> flussRecordSerializer) {
+        this.flussConfig = flussConfig;
+        this.mailboxExecutor = mailboxExecutor;
+        this.flussRecordSerializer = flussRecordSerializer;
+        this.writerMap = new HashMap<>();
+        this.tableMap = new HashMap<>();
+    }
+
+    public void initialize(SinkWriterMetricGroup metricGroup) throws 
IOException {
+        LOG.info("Opening Fluss with config {}", flussConfig);
+        this.metricGroup = metricGroup;
+        flinkMetricRegistry =
+                new FlinkMetricRegistry(
+                        metricGroup, 
Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
+        connection = ConnectionFactory.createConnection(flussConfig, 
flinkMetricRegistry);
+        flussRecordSerializer.open(connection);
+
+        initMetrics();
+    }
+
+    protected void initMetrics() {
+        numRecordsOutCounter = metricGroup.getNumRecordsSendCounter();
+        numRecordsOutErrorsCounter = 
metricGroup.getNumRecordsOutErrorsCounter();
+        metricGroup.setCurrentSendTimeGauge(this::computeSendTime);
+    }
+
+    @Override
+    public void write(InputT inputValue, Context context) throws IOException {
+        checkAsyncException();
+
+        try {
+            FlussEvent flussEvent = 
flussRecordSerializer.serialize(inputValue);
+
+            TablePath tablePath = flussEvent.getTablePath();
+
+            if (flussEvent.isShouldRefreshSchema() || 
!writerMap.containsKey(tablePath)) {
+                // refresh table schema
+                if (tableMap.containsKey(tablePath)) {
+                    Table table = tableMap.remove(tablePath);
+                    writerMap.remove(tablePath);
+                    table.close();
+                }
+
+                Table table = connection.getTable(tablePath);
+                TableWriter writer;
+                if (table.getTableInfo().hasPrimaryKey()) {
+                    writer = table.newUpsert().createWriter();
+                } else {
+                    writer = table.newAppend().createWriter();
+                }
+                tableMap.put(tablePath, table);
+                writerMap.put(tablePath, writer);
+            }
+
+            List<RowWithOp> rowWithOps = flussEvent.getRowWithOps();
+            if (rowWithOps == null) {
+                return;
+            }
+            for (RowWithOp rowWithOp : rowWithOps) {
+                OperationType opType = rowWithOp.getOperationType();
+                InternalRow row = rowWithOp.getRow();
+                if (opType == OperationType.IGNORE) {
+                    // skip writing the row
+                    return;
+                }
+                CompletableFuture<?> writeFuture =
+                        write(writerMap.get(tablePath), opType, row, 
tablePath);
+                writeFuture.whenComplete(
+                        (ignored, throwable) -> {
+                            if (throwable != null) {
+                                if (this.asyncWriterException == null) {
+                                    this.asyncWriterException = throwable;
+                                }
+
+                                // Checking for exceptions from previous writes
+                                mailboxExecutor.execute(
+                                        this::checkAsyncException, "Update 
error metric");
+                            }
+                        });
+
+                numRecordsOutCounter.inc();
+            }
+
+        } catch (Exception e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    private CompletableFuture<?> write(
+            TableWriter writer, OperationType opType, InternalRow row, 
TablePath tablePath)
+            throws IOException {
+        if (writer instanceof UpsertWriter) {
+            UpsertWriter upsertWriter = (UpsertWriter) writer;
+            if (opType == OperationType.UPSERT) {
+                return upsertWriter.upsert(row);
+            } else if (opType == OperationType.DELETE) {
+                return upsertWriter.delete(row);
+            } else {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported operation type: %s for primary 
key table %s",
+                                opType, tablePath));
+            }
+        } else if (writer instanceof AppendWriter) {
+            AppendWriter appendWriter = (AppendWriter) writer;
+            if (opType == OperationType.APPEND) {
+                return appendWriter.append(row);
+            } else {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported operation type: %s for log table 
%s",
+                                opType, tablePath));
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported writer type: %s for table %s",
+                            writer.getClass(), tablePath));
+        }
+    }
+
+    public void flush(boolean endOfInput) throws IOException {
+        for (TableWriter writer : writerMap.values()) {
+            writer.flush();
+            checkAsyncException();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.info("Closing Fluss sink function.");
+        try {
+            for (Table table : tableMap.values()) {
+                table.close();
+            }
+
+            tableMap.clear();
+
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            LOG.warn("Exception occurs while closing Fluss Connection.", e);
+        }
+        connection = null;
+
+        if (flinkMetricRegistry != null) {
+            flinkMetricRegistry.close();
+        }
+        flinkMetricRegistry = null;
+
+        // Rethrow exception for the case in which close is called before 
writer() and flush().
+        checkAsyncException();
+
+        LOG.info("Finished closing Fluss sink function.");
+    }
+
+    // todo: move sanityCheck to fluss.
+    private void sanityCheck(TableInfo flussTableInfo, boolean hasPrimaryKey) {
+        // when it's UpsertSinkWriter, it means it has primary key got from 
Flink's metadata
+        if (flussTableInfo.hasPrimaryKey() != hasPrimaryKey) {
+            throw new ValidationException(
+                    String.format(
+                            "Primary key constraint is not matched between 
metadata in Fluss (%s) and Flink (%s).",
+                            flussTableInfo.hasPrimaryKey(), hasPrimaryKey));
+        }
+        //        RowType currentTableRowType =

Review Comment:
   remove this code piece and you can post a fluss jira link here



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/FlinkSink.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2.metrics;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlinkSinkWriter;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRecordSerializer;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+
+import com.alibaba.fluss.config.Configuration;
+
+import java.io.IOException;
+
+/* This file is based on source code of Apache Fluss Project 
(https://fluss.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Flink sink for Fluss. */
+public class FlinkSink<INPUT> implements Sink<INPUT> {

Review Comment:
   Why you put this class under package `xx.metrics` ?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/FlinkSink.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2.metrics;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlinkSinkWriter;
+import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRecordSerializer;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+
+import com.alibaba.fluss.config.Configuration;
+
+import java.io.IOException;
+
+/* This file is based on source code of Apache Fluss Project 
(https://fluss.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Flink sink for Fluss. */
+public class FlinkSink<INPUT> implements Sink<INPUT> {

Review Comment:
   Note, we're in Flink CDC project, thus let's call this class name 
`FlussSink` instead of `FlinkSink`



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/OperationType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.v2;
+
+/* This file is based on source code of Apache Fluss Project 
(https://fluss.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Enumeration of row operation types used in Fluss-Flink data processing.
+ *
+ * <p>This enum represents the type of operation associated with a row, such 
as an append (insert),
+ * upsert (update or insert), delete, or ignore. It is used to indicate how a 
row should be
+ * interpreted or processed in downstream systems.
+ *
+ * <ul>
+ *   <li>{@link #APPEND} - Represents an append-only (insert) operation.
+ *   <li>{@link #UPSERT} - Represents an upsert operation (update or insert).
+ *   <li>{@link #DELETE} - Represents a delete operation.
+ *   <li>{@link #IGNORE} - Represents an operation that should be ignored.
+ * </ul>
+ *
+ * @see com.alibaba.fluss.flink.row.RowWithOp
+ */
+public enum OperationType {

Review Comment:
   we can rename to `FlussOperationType ` to avoid misleading?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to