loserwang1024 commented on code in PR #4047:
URL: https://github.com/apache/flink-cdc/pull/4047#discussion_r2156455532
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussUtils.java:
##########
@@ -0,0 +1,99 @@
+package org.apache.flink.cdc.connectors.fluss.utils;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.types.DataType;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+public class FlussUtils {
+ /** Format DATE type data. */
+ private static final DateTimeFormatter DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ /** Format timestamp-related type data. */
+ private static final DateTimeFormatter DATETIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ public static RecordData.FieldGetter createFieldGetter(
+ DataType fieldType, int fieldPos, ZoneId zoneId) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ fieldGetter = record -> record.getBoolean(fieldPos);
+ break;
+ case TINYINT:
+ fieldGetter = record -> record.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = record -> record.getShort(fieldPos);
+ break;
+ case INTEGER:
+ fieldGetter = record -> record.getInt(fieldPos);
+ break;
+ case BIGINT:
+ fieldGetter = record -> record.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getDouble(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter =
Review Comment:
why toBigDecimal again?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussUtils.java:
##########
@@ -0,0 +1,99 @@
+package org.apache.flink.cdc.connectors.fluss.utils;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.types.DataType;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+public class FlussUtils {
+ /** Format DATE type data. */
+ private static final DateTimeFormatter DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ /** Format timestamp-related type data. */
+ private static final DateTimeFormatter DATETIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ public static RecordData.FieldGetter createFieldGetter(
+ DataType fieldType, int fieldPos, ZoneId zoneId) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ fieldGetter = record -> record.getBoolean(fieldPos);
+ break;
+ case TINYINT:
+ fieldGetter = record -> record.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = record -> record.getShort(fieldPos);
+ break;
+ case INTEGER:
+ fieldGetter = record -> record.getInt(fieldPos);
+ break;
+ case BIGINT:
+ fieldGetter = record -> record.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getDouble(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter =
+ record ->
+ record.getDecimal(fieldPos, decimalPrecision,
decimalScale)
+ .toBigDecimal();
+ break;
+ case CHAR:
+ case VARCHAR:
+ fieldGetter = record -> record.getString(fieldPos).toString();
Review Comment:
why toString again?
```suggestion
fieldGetter = record -> record.getString(fieldPos);
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java:
##########
@@ -0,0 +1,40 @@
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.connectors.fluss.config.FlussSinkOptions;
+
+import com.alibaba.fluss.flink.sink.FlussSink;
+
+import java.time.ZoneId;
+
+public class FlussDataSink implements DataSink {
+ private final FlussSinkOptions flssOptions;
+
+ private final ZoneId zoneId;
+
+ public FlussDataSink(FlussSinkOptions flssOptions, ZoneId zoneId) {
+ this.flssOptions = flssOptions;
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ return FlinkSinkProvider.of(
+ FlussSink.<Event>builder()
+ .setBootstrapServers(flssOptions.getBootstrapServers())
+ .setDatabase(flssOptions.getDatabase())
+ .setTable(flssOptions.getTable())
Review Comment:
We should pay attention to one thing: when use FlussSink, it assume a table
in created already!
1. when invoke `FlussSink.build`, it will get the table
from`admin.getTableInfo(tablePath).get();`. It doesn't work in yaml sink, when
we start the job, no table is existed!
2. The shuffle logic and FlinkSinkWriter is designed for one table, not the
whole database's table.
Thus, we should redesign the sink logic in cdc pipeline. If could, we can
merger this to fluss repo later.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+
+import com.alibaba.fluss.flink.row.RowWithOp;
+import com.alibaba.fluss.flink.sink.serializer.FlussSerializationSchema;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.alibaba.fluss.flink.row.OperationType.DELETE;
+import static com.alibaba.fluss.flink.row.OperationType.UPSERT;
+
+public class FlussEventSerializationSchema implements
FlussSerializationSchema<Event> {
+ private static final long serialVersionUID = 1L;
+
+ private transient Map<TableId, TableSchemaInfo> tableInfoMap;
+ private final ZoneId zoneId;
+
+ public FlussEventSerializationSchema(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public void open(InitializationContext initializationContext) throws
Exception {
+ this.tableInfoMap = new HashMap<>();
+ }
+
+ @Override
+ public RowWithOp serialize(Event record) throws Exception {
+ if (record instanceof SchemaChangeEvent) {
+ applySchemaChangeEvent((SchemaChangeEvent) record);
+ return null;
+ } else if (record instanceof DataChangeEvent) {
+ return applyDataChangeEvent((DataChangeEvent) record);
+ } else {
+ throw new UnsupportedOperationException("Don't support event " +
record);
+ }
+ }
+
+ private void applySchemaChangeEvent(SchemaChangeEvent event) {
+ TableId tableId = event.tableId();
+ org.apache.flink.cdc.common.schema.Schema newSchema;
+ if (event instanceof CreateTableEvent) {
+ newSchema = ((CreateTableEvent) event).getSchema();
+ } else {
+ TableSchemaInfo tableInfo = tableInfoMap.get(tableId);
Review Comment:
throw exception now.Because fluss still not support any schema change now.
We apply this when fluss support it in V0.8.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]