ruanhang1993 commented on code in PR #3904: URL: https://github.com/apache/flink-cdc/pull/3904#discussion_r2015840317
########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.ToStringConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** End-to-end tests for mysql cdc to Iceberg pipeline job. */ +public class MySqlToIcebergE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class); + + public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL, "iceberg_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + private String warehouse; + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + LOG.info("Starting containers..."); + warehouse = temporaryFolder.newFolder(UUID.randomUUID().toString()).getPath(); + jobManagerConsumer = new ToStringConsumer(); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) + .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) + .withLogConsumer(jobManagerConsumer); + Startables.deepStart(Stream.of(jobManager)).join(); + LOG.info("JobManager is started."); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse); + + taskManagerConsumer = new ToStringConsumer(); + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withVolumesFrom(jobManager, BindMode.READ_WRITE) + .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) + .withLogConsumer(taskManagerConsumer); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("TaskManager is started."); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", warehouse); + inventoryDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + inventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String database = inventoryDatabase.getDatabaseName(); + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: iceberg\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.type: hadoop\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 1", Review Comment: Please add a test with larger parallelism. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.actions.Actions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** An Operator to add checkpointId to IcebergTable Small file compaction. */ +public class CompactionOperator + extends AbstractStreamOperator<CommittableMessage<WriteResultWrapper>> + implements OneInputStreamOperator< + CommittableMessage<WriteResultWrapper>, CommittableMessage<WriteResultWrapper>> { + protected static final Logger LOGGER = LoggerFactory.getLogger(CompactionOperator.class); + + private final Map<String, String> catalogOptions; + + private Catalog catalog; + + /** store a list of MultiTableCommittable in one checkpoint. */ + private final Map<TableId, Integer> tableCommitTimes; + + private final Set<TableId> compactedTables; + + private final CompactionOptions compactionOptions; + + private volatile Throwable throwable; + + private ExecutorService compactExecutor; + + public CompactionOperator( + Map<String, String> catalogOptions, CompactionOptions compactionOptions) { + this.tableCommitTimes = new HashMap<>(); + this.compactedTables = new HashSet<>(); + this.catalogOptions = catalogOptions; + this.compactionOptions = compactionOptions; + } + + @Override + public void open() throws Exception { + super.open(); + if (compactExecutor == null) { + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-Cdc-Compaction")); + } + } + + @Override + public void processElement(StreamRecord<CommittableMessage<WriteResultWrapper>> element) { + if (element.getValue() instanceof CommittableWithLineage) { + TableId tableId = + ((CommittableWithLineage<WriteResultWrapper>) element.getValue()) + .getCommittable() + .getTableId(); + tableCommitTimes.put(tableId, tableCommitTimes.getOrDefault(tableId, 0) + 1); + int commitTimes = tableCommitTimes.get(tableId); + if (commitTimes >= compactionOptions.getCommitInterval() + && !compactedTables.contains(tableId)) { + if (throwable != null) { + throw new RuntimeException(throwable); Review Comment: Do we need to move this if statement to the start of this method `processElement` out of `if (element.getValue() instanceof CommittableWithLineage)`? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; + +/** Util class for {@link IcebergDataSink}. */ +public class IcebergTypeUtils { + + /** Convert column from CDC framework to Iceberg framework. */ + public static Types.NestedField convertCDCColumnToIcebergField( Review Comment: ```suggestion public static Types.NestedField convertCdcColumnToIcebergField( ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** A {@link MetadataApplier} for Apache Iceberg. */ +public class IcebergMetadataApplier implements MetadataApplier { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class); + + private transient Catalog catalog; + + private final Map<String, String> catalogOptions; + + // currently, we set table options for all tables using the same options. + private final Map<String, String> tableOptions; + + private final Map<TableId, List<String>> partitionMaps; + + private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes; + + public IcebergMetadataApplier(Map<String, String> catalogOptions) { + this.catalogOptions = catalogOptions; + this.tableOptions = new HashMap<>(); + this.partitionMaps = new HashMap<>(); + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); Review Comment: ```suggestion this(catalogOptions, new HashMap<>(), new HashMap<>()); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** A {@link MetadataApplier} for Apache Iceberg. */ +public class IcebergMetadataApplier implements MetadataApplier { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class); + + private transient Catalog catalog; + + private final Map<String, String> catalogOptions; + + // currently, we set table options for all tables using the same options. + private final Map<String, String> tableOptions; + + private final Map<TableId, List<String>> partitionMaps; + + private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes; + + public IcebergMetadataApplier(Map<String, String> catalogOptions) { + this.catalogOptions = catalogOptions; + this.tableOptions = new HashMap<>(); + this.partitionMaps = new HashMap<>(); + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + public IcebergMetadataApplier( + Map<String, String> catalogOptions, + Map<String, String> tableOptions, + Map<TableId, List<String>> partitionMaps) { + this.catalogOptions = catalogOptions; + this.tableOptions = tableOptions; + this.partitionMaps = partitionMaps; + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) + throws SchemaEvolveException { + if (catalog == null) { + catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-metadata-catalog", catalogOptions, new Configuration()); + } + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> { + applyAddColumn(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnType(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTable(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumn(dropColumnEvent); + return null; + }, + dropTableEvent -> { + throw new UnsupportedSchemaChangeEventException(dropTableEvent); + }, + renameColumnEvent -> { + applyRenameColumn(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + }); + } + + private void applyCreateTable(CreateTableEvent event) { + try { + TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier()); + + // Step1: Build Schema. + org.apache.flink.cdc.common.schema.Schema cdcSchema = event.getSchema(); + List<Types.NestedField> columns = new ArrayList<>(); + Set<Integer> identifierFieldIds = new HashSet<>(); + for (int index = 0; index < event.getSchema().getColumnCount(); index++) { + columns.add( + IcebergTypeUtils.convertCDCColumnToIcebergField( + index, (PhysicalColumn) cdcSchema.getColumns().get(index))); + if (cdcSchema.primaryKeys().contains(cdcSchema.getColumns().get(index).getName())) { + identifierFieldIds.add(index); + } + } + + // Step2: Build partition spec. + Schema icebergSchema = new Schema(columns, identifierFieldIds); + List<String> partitionColumns = cdcSchema.partitionKeys(); + if (partitionMaps.containsKey(event.tableId())) { + partitionColumns = partitionMaps.get(event.tableId()); + } + PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema); + for (String name : partitionColumns) { + // TODO Add more partition transforms, see + // https://iceberg.apache.org/spec/#partition-transforms. + builder.identity(name); + } + PartitionSpec partitionSpec = builder.build(); + if (!catalog.tableExists(tableIdentifier)) { + catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAddColumn(AddColumnEvent event) { + TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier()); + try { + Table table = catalog.loadTable(tableIdentifier); + applyAddColumnEventWithPosition(table, event); + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) + throws SchemaEvolveException { + + try { + UpdateSchema updateSchema = table.updateSchema(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + Column addColumn = columnWithPosition.getAddColumn(); Review Comment: ```suggestion Column addColumn = columnWithPosition.getAddColumn(); String columnName = addColumn.getName(); LogicalType logicalType = FlinkSchemaUtil.convert( DataTypeUtils.toFlinkDataType(addColumn.getType()) .getLogicalType()); ``` Use `columnName` and `logicalType` in following code. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.iceberg.sink.utils.OptionUtils; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOptions; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES; +import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES; + +/** A {@link DataSinkFactory} for Apache Iceberg. */ +public class IcebergDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "iceberg"; + + @Override + public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); + + Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); + OptionUtils.printOptions(IDENTIFIER, allOptions); + + Map<String, String> catalogOptions = new HashMap<>(); + Map<String, String> tableOptions = new HashMap<>(); + allOptions.forEach( + (key, value) -> { + if (key.startsWith(PREFIX_TABLE_PROPERTIES)) { + tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value); + } else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { + catalogOptions.put( + key.substring( + IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()), + value); + } + }); + // Hard code to disable catalog cache as we will try to get latest schema after schema + // change. + catalogOptions.put("cache-enabled", "false"); + ZoneId zoneId = ZoneId.systemDefault(); + if (!Objects.equals( + context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) { + zoneId = + ZoneId.of( + context.getPipelineConfiguration() + .get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); + } + String schemaOperatorUid = + context.getPipelineConfiguration() + .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID); + CompactionOptions compactionOptions = + getCompactionStrategy(context.getFactoryConfiguration()); + + return new IcebergDataSink( + catalogOptions, + tableOptions, + new HashMap<>(), + zoneId, + schemaOperatorUid, + compactionOptions); + } + + private CompactionOptions getCompactionStrategy(Configuration configuration) { + return CompactionOptions.builder() + .enabled(configuration.get(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED)) + .commitInterval( + configuration.get(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL)) + .parallelism(configuration.get(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM)) + .build(); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + return options; Review Comment: return ```suggestion return new HashSet<>(); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; + +/** Util class for {@link IcebergDataSink}. */ Review Comment: ```suggestion /** Util class for types in {@link IcebergDataSink}. */ ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.v2; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.io.TaskWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** A {@link SinkWriter} for Apache Iceberg. */ +public class IcebergWriter implements CommittingSinkWriter<Event, WriteResultWrapper> { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergWriter.class); + + public static final String DEFAULT_FILE_FORMAT = "parquet"; + + public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024; + + Map<TableId, RowDataTaskWriterFactory> writerFactoryMap; + + Map<TableId, TaskWriter<RowData>> writerMap; + + Map<TableId, TableSchemaWrapper> schemaMap; + + List<WriteResultWrapper> temporaryWriteResult; + + private final Catalog catalog; + + private final int taskId; + + private final int attemptId; + + private final ZoneId zoneId; + + public IcebergWriter( + Map<String, String> catalogOptions, int taskId, int attemptId, ZoneId zoneId) { + catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-writer-catalog", catalogOptions, new Configuration()); + writerFactoryMap = new HashMap<>(); + writerMap = new HashMap<>(); + schemaMap = new HashMap<>(); + temporaryWriteResult = new ArrayList<>(); + this.taskId = taskId; + this.attemptId = attemptId; + this.zoneId = zoneId; + } + + @Override + public Collection<WriteResultWrapper> prepareCommit() throws IOException, InterruptedException { + List<WriteResultWrapper> list = new ArrayList<>(); + list.addAll(temporaryWriteResult); + list.addAll(getWriteResult()); + temporaryWriteResult.clear(); + return list; + } + + private RowDataTaskWriterFactory getRowDataTaskWriterFactory(TableId tableId) { + Table table = catalog.loadTable(TableIdentifier.parse(tableId.identifier())); + RowType rowType = FlinkSchemaUtil.convert(table.schema()); + RowDataTaskWriterFactory rowDataTaskWriterFactory = + new RowDataTaskWriterFactory( + table, + rowType, + DEFAULT_MAX_FILE_SIZE, + FileFormat.fromString(DEFAULT_FILE_FORMAT), + new HashMap<>(), + new ArrayList<>(table.schema().identifierFieldIds()), + true); + rowDataTaskWriterFactory.initialize(taskId, attemptId); + return rowDataTaskWriterFactory; + } + + @Override + public void write(Event event, Context context) throws IOException, InterruptedException { + if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + TableId tableId = dataChangeEvent.tableId(); + writerFactoryMap.computeIfAbsent(tableId, this::getRowDataTaskWriterFactory); + TaskWriter<RowData> writer = + writerMap.computeIfAbsent( + tableId, tableId1 -> writerFactoryMap.get(tableId1).create()); + writer.write(schemaMap.get(tableId).convertEventToRowData(dataChangeEvent)); + } else { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + TableId tableId = schemaChangeEvent.tableId(); + TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId); + + Schema newSchema = + tableSchemaWrapper != null + ? SchemaUtils.applySchemaChangeEvent( + tableSchemaWrapper.getSchema(), schemaChangeEvent) + : SchemaUtils.applySchemaChangeEvent(null, schemaChangeEvent); + schemaMap.put(tableId, new TableSchemaWrapper(newSchema, zoneId)); + } + } + + @Override + public void flush(boolean flush) throws IOException { + // Notice: flush method may be called many times during one checkpoint. + temporaryWriteResult.addAll(getWriteResult()); + } + + private List<WriteResultWrapper> getWriteResult() throws IOException { + List<WriteResultWrapper> writeResults = new ArrayList<>(); + for (Map.Entry<TableId, TaskWriter<RowData>> entry : writerMap.entrySet()) { + WriteResultWrapper writeResultWrapper = + new WriteResultWrapper(entry.getValue().complete(), entry.getKey()); + writeResults.add(writeResultWrapper); + LOGGER.info(writeResultWrapper.buildDescription()); + } + writerMap.clear(); + writerFactoryMap.clear(); + return writeResults; + } + + @Override + public void writeWatermark(Watermark watermark) {} + + @Override + public void close() throws Exception { + if (schemaMap != null) { + schemaMap.clear(); + schemaMap = null; Review Comment: Will `IcebergWriter` be reused? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; + +/** Util class for {@link IcebergDataSink}. */ +public class IcebergTypeUtils { + + /** Convert column from CDC framework to Iceberg framework. */ + public static Types.NestedField convertCDCColumnToIcebergField( + int index, PhysicalColumn column) { + DataType dataType = column.getType(); + return Types.NestedField.of( + index, + dataType.isNullable(), + column.getName(), + convertCDCTypeToIcebergType(dataType), + column.getComment()); + } + + /** + * Convert data type from CDC framework to Iceberg framework, refer to <a + * href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg">...</a>. + */ + public static Type convertCDCTypeToIcebergType(DataType type) { + // ordered by type root definition + List<DataType> children = type.getChildren(); + int length = DataTypes.getLength(type).orElse(0); + int precision = DataTypes.getPrecision(type).orElse(0); + int scale = DataTypes.getScale(type).orElse(0); + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return new Types.StringType(); + case BOOLEAN: + return new Types.BooleanType(); + case BINARY: + case VARBINARY: + return new Types.BinaryType(); + case DECIMAL: + return Types.DecimalType.of(precision, scale); + case TINYINT: + case SMALLINT: + case INTEGER: + return new Types.IntegerType(); + case DATE: + return new Types.DateType(); + case TIME_WITHOUT_TIME_ZONE: + return Types.TimeType.get(); + case BIGINT: + return new Types.LongType(); + case FLOAT: + return new Types.FloatType(); + case DOUBLE: + return new Types.DoubleType(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return Types.TimestampType.withoutZone(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + return Types.TimestampType.withZone(); + default: + throw new IllegalArgumentException("Illegal type: " + type); Review Comment: ```suggestion throw new IllegalArgumentException("Unsupported cdc type in iceberg: " + type); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; + +/** Util class for {@link IcebergDataSink}. */ +public class IcebergTypeUtils { + + /** Convert column from CDC framework to Iceberg framework. */ + public static Types.NestedField convertCDCColumnToIcebergField( + int index, PhysicalColumn column) { + DataType dataType = column.getType(); + return Types.NestedField.of( + index, + dataType.isNullable(), + column.getName(), + convertCDCTypeToIcebergType(dataType), + column.getComment()); + } + + /** + * Convert data type from CDC framework to Iceberg framework, refer to <a + * href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg">...</a>. + */ + public static Type convertCDCTypeToIcebergType(DataType type) { + // ordered by type root definition + List<DataType> children = type.getChildren(); + int length = DataTypes.getLength(type).orElse(0); + int precision = DataTypes.getPrecision(type).orElse(0); + int scale = DataTypes.getScale(type).orElse(0); + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return new Types.StringType(); + case BOOLEAN: + return new Types.BooleanType(); + case BINARY: + case VARBINARY: + return new Types.BinaryType(); + case DECIMAL: + return Types.DecimalType.of(precision, scale); + case TINYINT: + case SMALLINT: + case INTEGER: + return new Types.IntegerType(); + case DATE: + return new Types.DateType(); + case TIME_WITHOUT_TIME_ZONE: + return Types.TimeType.get(); + case BIGINT: + return new Types.LongType(); + case FLOAT: + return new Types.FloatType(); + case DOUBLE: + return new Types.DoubleType(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return Types.TimestampType.withoutZone(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + return Types.TimestampType.withZone(); + default: + throw new IllegalArgumentException("Illegal type: " + type); + } + } + + public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) { + List<Column> columns = schema.getColumns(); + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(columns.size()); + for (int i = 0; i < columns.size(); i++) { + fieldGetters.add( + IcebergTypeUtils.createFieldGetter(columns.get(i).getType(), i, zoneId)); + } + return fieldGetters; + } + + /** Create a {@link RecordData.FieldGetter} for the given {@link DataType}. */ + public static RecordData.FieldGetter createFieldGetter( + DataType fieldType, int fieldPos, ZoneId zoneId) { + final RecordData.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = + row -> + org.apache.flink.table.data.StringData.fromString( + row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return org.apache.flink.table.data.DecimalData.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case INTEGER: + case DATE: + fieldGetter = row -> row.getInt(fieldPos); + break; + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = + (row) -> { + LocalDateTime localDateTime = + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toLocalDateTime(); + return DateTimeUtil.microsFromTimestamp(localDateTime); + }; + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + (row) -> { + Instant instant = + row.getLocalZonedTimestampData( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toInstant(); + return DateTimeUtil.microsFromTimestamptz( + OffsetDateTime.ofInstant(instant, zoneId)); + }; + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: Review Comment: Is it right that `TIMESTAMP_WITH_LOCAL_TIME_ZONE` and `TIMESTAMP_WITH_TIME_ZONE` have the same code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
