cshuo commented on code in PR #4164:
URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2472357187


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java:
##########
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.function;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.HudiRecordEventSerializer;
+import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent;
+import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
+import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ViewStorageProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Multi-table wrapper function that routes events to table-specific 
EventBucketStreamWriteFunction
+ * instances. This approach maintains table isolation by creating dedicated 
function instances per
+ * table while keeping the core write functions single-table focused.
+ */
+public class MultiTableEventStreamWriteFunction extends 
AbstractStreamWriteFunction<Event>
+        implements EventProcessorFunction {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MultiTableEventStreamWriteFunction.class);
+
+    /** Table-specific write functions created dynamically when new tables are 
encountered. */
+    private transient Map<TableId, EventBucketStreamWriteFunction> 
tableFunctions;
+
+    /** Track tables that have been initialized to avoid duplicate 
initialization. */
+    private transient Map<TableId, Boolean> initializedTables;
+
+    /** Cache of schemas per table for RowType generation. */
+    private transient Map<TableId, Schema> schemaMaps;
+
+    /** Persistent state for schemas to survive checkpoints/savepoints. */
+    private transient ListState<Tuple2<TableId, Schema>> schemaState;
+
+    private transient Map<TableId, Configuration> tableConfigurations;
+
+    /** Schema evolution client to communicate with SchemaOperator. */
+    private transient SchemaEvolutionClient schemaEvolutionClient;
+
+    /** Store the function initialization context for table functions. */
+    private transient FunctionInitializationContext 
functionInitializationContext;
+
+    public MultiTableEventStreamWriteFunction(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        this.functionInitializationContext = context;
+
+        // Initialize schema map before restoring state
+        if (this.schemaMaps == null) {
+            this.schemaMaps = new HashMap<>();
+        }
+
+        // Initialize schema state for persistence across 
checkpoints/savepoints
+        // Using operator state since this is not a keyed stream
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        TupleSerializer<Tuple2<TableId, Schema>> tupleSerializer =
+                new TupleSerializer(
+                        Tuple2.class,
+                        new 
org.apache.flink.api.common.typeutils.TypeSerializer[] {
+                            TableIdSerializer.INSTANCE, 
SchemaSerializer.INSTANCE
+                        });
+        ListStateDescriptor<Tuple2<TableId, Schema>> schemaStateDescriptor =
+                new ListStateDescriptor<>("schemaState", tupleSerializer);
+        this.schemaState = 
context.getOperatorStateStore().getUnionListState(schemaStateDescriptor);
+
+        // Restore schemas from state if this is a restore operation
+        if (context.isRestored()) {
+            LOG.info("Restoring schemas from state");
+            for (Tuple2<TableId, Schema> entry : schemaState.get()) {
+                schemaMaps.put(entry.f0, entry.f1);
+                LOG.info("Restored schema for table: {}", entry.f0);
+            }
+            LOG.info("Restored {} schemas from state", schemaMaps.size());
+        }
+
+        LOG.info("MultiTableEventStreamWriteFunction state initialized");
+    }
+
+    /**
+     * Sets the SchemaEvolutionClient from the operator level since functions 
don't have direct
+     * access to TaskOperatorEventGateway.
+     */
+    public void setSchemaEvolutionClient(SchemaEvolutionClient 
schemaEvolutionClient) {
+        this.schemaEvolutionClient = schemaEvolutionClient;
+        LOG.info("SchemaEvolutionClient set for 
MultiTableEventStreamWriteFunction");
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        this.tableFunctions = new HashMap<>();
+        this.initializedTables = new HashMap<>();
+        // Don't reinitialize schemaMaps if it already has restored schemas 
from state
+        if (this.schemaMaps == null) {
+            this.schemaMaps = new HashMap<>();
+        }
+        this.tableConfigurations = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(Event event, Context ctx, Collector<RowData> 
out) throws Exception {
+        LOG.debug("Processing event of type: {}", 
event.getClass().getSimpleName());
+
+        // Route event to appropriate handler based on type
+        if (event instanceof DataChangeEvent) {
+            processDataChange((DataChangeEvent) event);
+        } else if (event instanceof SchemaChangeEvent) {
+            processSchemaChange((SchemaChangeEvent) event);
+        } else if (event instanceof FlushEvent) {
+            processFlush((FlushEvent) event);
+        } else {
+            LOG.warn("Received unknown event type: {}", 
event.getClass().getName());
+        }
+    }
+
+    /**
+     * Processes schema events. For a {@link CreateTableEvent}, it ensures 
that the coordinator is
+     * notified and the physical Hudi table is created. For a {@link 
SchemaChangeEvent}, it updates
+     * the local schema cache.
+     *
+     * <p>Implements {@link 
EventProcessorFunction#processSchemaChange(SchemaChangeEvent)}.
+     */
+    @Override
+    public void processSchemaChange(SchemaChangeEvent event) throws Exception {
+        TableId tableId = event.tableId();
+        try {
+            if (event instanceof CreateTableEvent) {
+                CreateTableEvent createTableEvent = (CreateTableEvent) event;
+                schemaMaps.put(tableId, createTableEvent.getSchema());
+                LOG.debug("Cached schema for new table: {}", tableId);
+
+                initializedTables.computeIfAbsent(
+                        tableId,
+                        tId -> {
+                            try {
+                                // Send an explicit event to the coordinator 
so it can prepare
+                                // resources *before* we attempt to write any 
data.
+                                getOperatorEventGateway()
+                                        .sendEventToCoordinator(
+                                                new 
CreateTableOperatorEvent(createTableEvent));
+                                LOG.info(
+                                        "Sent CreateTableOperatorEvent to 
coordinator for new table: {}",
+                                        tId);
+
+                                // Now, create the physical dir for Hudi table.
+                                Configuration tableConfig = 
createTableSpecificConfig(tId);
+                                createHudiTablePath(tableConfig, tId);
+                            } catch (Exception e) {
+                                // Re-throw to fail the Flink task if 
initialization fails.
+                                throw new RuntimeException(
+                                        "Failed during first-time 
initialization for table: " + tId,
+                                        e);
+                            }
+                            return true; // Mark as initialized for this 
function instance.
+                        });
+                // Ensure tableFunction is initialized
+                getOrCreateTableFunction(tableId);
+            } else if (event instanceof SchemaChangeEvent) {
+                SchemaChangeEvent schemaChangeEvent = event;
+                Schema existingSchema = schemaMaps.get(tableId);
+                if (existingSchema != null
+                        && !SchemaUtils.isSchemaChangeEventRedundant(
+                                existingSchema, schemaChangeEvent)) {
+
+                    LOG.info(
+                            "Schema change event received for table {}: {}",
+                            tableId,
+                            schemaChangeEvent);
+                    LOG.info(
+                            "Existing schema for table {} has {} columns: {}",
+                            tableId,
+                            existingSchema.getColumnCount(),
+                            existingSchema.getColumnNames());
+
+                    Schema newSchema =
+                            SchemaUtils.applySchemaChangeEvent(existingSchema, 
schemaChangeEvent);
+
+                    LOG.info(
+                            "New schema for table {} has {} columns: {}",
+                            tableId,
+                            newSchema.getColumnCount(),
+                            newSchema.getColumnNames());
+
+                    schemaMaps.put(tableId, newSchema);
+
+                    // Invalidate cached table configuration so it gets 
recreated with NEW
+                    // schema
+                    // The tableConfigurations cache holds 
FlinkOptions.SOURCE_AVRO_SCHEMA which
+                    // must be updated
+                    tableConfigurations.remove(tableId);
+                    LOG.info(
+                            "Invalidated cached table configuration for {} to 
pick up new schema",
+                            tableId);
+
+                    // If table function exists, flush buffers and update its 
rowType
+                    EventBucketStreamWriteFunction tableFunction = 
tableFunctions.get(tableId);
+                    if (tableFunction != null) {
+                        LOG.info(
+                                "Schema changed for table {}, flushing buffers 
with OLD schema and updating to NEW RowType",
+                                tableId);
+                        // NOTE: Capture the OLD RowType before any changes
+                        // Buffered records were created with this schema
+                        RowType oldRowType = 
convertSchemaToFlinkRowType(existingSchema);
+
+                        // Flush existing buffers using the OLD schema
+                        // This ensures records buffered with N columns are 
read with N-column
+                        // schema
+                        tableFunction.flushAllBuckets(oldRowType);
+
+                        // Now safe to update to the NEW schema
+                        // Future records will use this new schema
+                        RowType newRowType = 
convertSchemaToFlinkRowType(newSchema);
+                        tableFunction.updateRowType(newRowType);
+
+                        String newAvroSchema =
+                                
AvroSchemaConverter.convertToSchema(newRowType).toString();
+
+                        LOG.info(
+                                "Updating write client for table: {} with new 
schema: {}",
+                                tableId,
+                                newAvroSchema);
+
+                        // Update write client's source avro schema with new 
schema
+                        
tableFunction.updateWriteClientWithNewSchema(newAvroSchema);
+
+                        LOG.info("Successfully handled schema change for 
table: {}", tableId);
+                    }
+
+                    LOG.debug("Updated schema for table: {}", tableId);
+                }
+            }
+
+            // Forward the event to tableFunction so that schemaMap for 
serializer is updated
+            tableFunctions.get(event.tableId()).processSchemaChange(event);
+        } catch (Exception e) {
+            LOG.error("Failed to process schema event for table: {}", tableId, 
e);
+            throw new RuntimeException("Failed to process schema event for 
table: " + tableId, e);
+        }
+    }
+
+    /**
+     * Processes change events (ChangeEvent) for writing. This triggers the 
actual Hudi write
+     * operations as side effects by delegating to table-specific functions.
+     *
+     * <p>Implements {@link 
EventProcessorFunction#processDataChange(DataChangeEvent)}.
+     */
+    @Override
+    public void processDataChange(DataChangeEvent event) throws Exception {
+        TableId tableId = event.tableId();
+        try {
+            LOG.debug("Processing change event for table: {}", tableId);
+
+            // Get or create table-specific function to handle this event
+            EventBucketStreamWriteFunction tableFunction = 
getOrCreateTableFunction(tableId);
+
+            // Use the table function to process the change event
+            // This will convert the event to HoodieFlinkInternalRow and 
buffer it for writing
+            tableFunction.processDataChange(event);
+
+            LOG.debug("Successfully processed change event for table: {}", 
tableId);
+
+        } catch (Exception e) {
+            LOG.error("Failed to process change event for table: {}", tableId, 
e);
+            throw new RuntimeException("Failed to process change event for 
table: " + tableId, e);
+        }
+    }
+
+    public static void createHudiTablePath(Configuration config, TableId 
tableId)
+            throws IOException {
+        String tablePath = config.get(FlinkOptions.PATH);
+        Path path = Paths.get(tablePath);
+        if (!Files.exists(path)) {
+            Files.createDirectories(path);
+        }
+    }
+
+    /**
+     * Processes a flush event for a specific table function. This simulates 
the FlushEvent
+     * processing that would normally happen in 
EventStreamWriteFunction.processElement.
+     */
+    private void processFlushForTableFunction(
+            EventBucketStreamWriteFunction tableFunction, Event flushEvent) {
+        try {

Review Comment:
   no need to use reflection now? call `tableFunction.flushRemaining(false);` 
directly



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/HudiRecordEventSerializer.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.event;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HudiRecordSerializer} for converting {@link Event} into {@link 
HoodieFlinkInternalRow}
+ * for Hudi writing.
+ *
+ * <p>This serializer maintains schema state per table and handles multi-table 
CDC events by:
+ *
+ * <ul>
+ *   <li>Caching schemas from CreateTableEvent and SchemaChangeEvent
+ *   <li>Converting DataChangeEvent to HoodieFlinkInternalRow using cached 
schemas
+ *   <li>Supporting bucket-wrapped events from upstream operators
+ * </ul>
+ *
+ * <p>Assumes that CreateTableEvent will always arrive before DataChangeEvent 
for each table,
+ * following the standard CDC pipeline startup sequence.
+ */
+public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> {

Review Comment:
   Seems `HudiRecordEventSerializer` is designed to deal with serializing for 
multiple tables. Like comments in `EventStreamWriteFunction`, 
`HudiRecordEventSerializer` should be a field of 
`MultiTableStreamWriteOperatorCoordinator`?  serializing  data change event to 
HoodieFlinkInternalRow and dispatch to corresponding table write functions.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/EventBucketStreamWriteFunction.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.function;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.HudiRecordEventSerializer;
+import org.apache.flink.cdc.connectors.hudi.sink.event.HudiRecordSerializer;
+import org.apache.flink.cdc.connectors.hudi.sink.model.BucketAssignmentIndex;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
+import org.apache.hudi.utils.RuntimeContextUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Extension of EventStreamWriteFunction to handle bucketing. */
+public class EventBucketStreamWriteFunction extends EventStreamWriteFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventBucketStreamWriteFunction.class);
+
+    private int parallelism;
+
+    private boolean isNonBlockingConcurrencyControl;
+
+    /** BucketID to file group mapping in each partition of a tableId. */
+    private BucketAssignmentIndex bucketAssignmentIndex;
+
+    /**
+     * Incremental bucket index of the current checkpoint interval, it is 
needed because the bucket
+     * type('I' or 'U') should be decided based on the committed files view, 
all the records in one
+     * bucket should have the same bucket type.
+     */
+    private Set<String> incBucketIndexes;
+
+    /** Serializer for converting Events to HoodieFlinkInternalRow for single 
table. */
+    private HudiRecordEventSerializer recordSerializer;
+
+    /** Function for calculating the task partition to dispatch. */
+    private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
+
+    /** Function to calculate num buckets per partition. */
+    private NumBucketsFunction numBucketsFunction;
+
+    /** Cached primary key fields for this table. */
+    private transient List<String> primaryKeyFields;
+
+    /** Cached field getters for primary key fields. */
+    private transient List<RecordData.FieldGetter> primaryKeyFieldGetters;
+
+    /** Cached schema for this table. */
+    private transient Schema cachedSchema;
+
+    /** Number of buckets for this function. */
+    private int numBuckets;
+
+    /**
+     * Constructs a BucketStreamWriteFunction.
+     *
+     * @param config The config options
+     */
+    public EventBucketStreamWriteFunction(Configuration config, RowType 
rowType) {
+        super(config, rowType);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws IOException {
+        super.open(parameters);
+        this.isNonBlockingConcurrencyControl =
+                OptionsResolver.isNonBlockingConcurrencyControl(config);
+        this.taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+        this.parallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+        this.bucketAssignmentIndex = new BucketAssignmentIndex();
+        this.incBucketIndexes = new HashSet<>();
+        this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(parallelism);
+        this.numBucketsFunction =
+                new NumBucketsFunction(
+                        
config.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+                        config.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
+                        config.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+
+        this.numBuckets = config.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+
+        // Initialize record serializer with system default zone ID
+        this.recordSerializer = new 
HudiRecordEventSerializer(ZoneId.systemDefault());
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        // Bootstrap will reload file groups from Hudi on startup
+    }
+
+    @Override
+    public void snapshotState() {
+        LOG.info("Triggering snapshotState");
+        super.snapshotState();
+        this.incBucketIndexes.clear();
+    }
+
+    @Override
+    public void processDataChange(DataChangeEvent event) throws Exception {
+        // Check if schema is available before processing
+        if (!recordSerializer.hasSchema(event.tableId())) {
+            // Schema not available yet - CreateTableEvent hasn't arrived
+            throw new IllegalStateException(
+                    "No schema available for table "
+                            + event.tableId()
+                            + ". CreateTableEvent should arrive before 
DataChangeEvent.");
+        }
+
+        HoodieFlinkInternalRow hoodieFlinkInternalRow = 
recordSerializer.serialize(event);
+        // Calculate bucket from event data for bucket assignment
+        int bucket = calculateBucketFromEvent(event);
+
+        // Define record location (file ID, instant time) based on bucket 
assignment
+        defineRecordLocation(bucket, hoodieFlinkInternalRow);
+
+        // Buffer the record for writing
+        bufferRecord(hoodieFlinkInternalRow);
+
+        LOG.debug(
+                "Processed DataChangeEvent for table {}: partition={}, 
fileId={}, instantTime={}",
+                event.tableId(),
+                hoodieFlinkInternalRow.getPartitionPath(),
+                hoodieFlinkInternalRow.getFileId(),
+                hoodieFlinkInternalRow.getInstantTime());
+    }
+
+    @Override
+    public void processSchemaChange(SchemaChangeEvent event) throws Exception {
+        // Single-table functions typically receive schema via serializer setup
+        // This is called when CreateTableEvent arrives
+        LOG.info("Schema change event received: {}", event);
+
+        // Handle schema events (CreateTableEvent, SchemaChangeEvent) - they 
don't produce records
+        // null will be returned from serialize
+        recordSerializer.serialize(event);
+    }
+
+    private void defineRecordLocation(int bucketNum, HoodieFlinkInternalRow 
record) {
+        final String partition = record.getPartitionPath();
+
+        // Check if this task should handle this bucket
+        if (!isBucketToLoad(bucketNum, partition)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Task %d received record for bucket %d which 
should not be handled by this task. "
+                                    + "This indicates a partitioning problem - 
records must be routed to the correct task.",
+                            taskID, bucketNum));
+        }
+
+        bootstrapIndexIfNeed(partition);
+        Map<Integer, String> bucketToFileId = 
bucketAssignmentIndex.getBucketToFileIdMap(partition);
+        final String bucketId = partition + "/" + bucketNum;
+
+        if (incBucketIndexes.contains(bucketId)) {
+            record.setInstantTime("I");
+            record.setFileId(bucketToFileId.get(bucketNum));
+        } else if (bucketToFileId.containsKey(bucketNum)) {
+            record.setInstantTime("U");
+            record.setFileId(bucketToFileId.get(bucketNum));
+        } else {
+            String newFileId =
+                    isNonBlockingConcurrencyControl
+                            ? 
BucketIdentifier.newBucketFileIdForNBCC(bucketNum)
+                            : 
BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+            record.setInstantTime("I");
+            record.setFileId(newFileId);
+            bucketToFileId.put(bucketNum, newFileId);
+            incBucketIndexes.add(bucketId);
+        }
+    }
+
+    /**
+     * Determine whether the current fileID belongs to the current task. 
partitionIndex == this
+     * taskID belongs to this task.
+     */
+    public boolean isBucketToLoad(int bucketNumber, String partition) {
+        int numBuckets = numBucketsFunction.getNumBuckets(partition);
+        return partitionIndexFunc.apply(numBuckets, partition, bucketNumber) 
== taskID;
+    }
+
+    /**
+     * Get partition_bucket -> fileID mapping from the existing hudi table. 
This is a required
+     * operation for each restart to avoid having duplicate file ids for one 
bucket.
+     */
+    private void bootstrapIndexIfNeed(String partition) {
+        if (bucketAssignmentIndex.containsPartition(partition)) {
+            return;
+        }
+        LOG.info(
+                "Loading Hoodie Table {}, with path {}/{}",
+                this.metaClient.getTableConfig().getTableName(),
+                this.metaClient.getBasePath(),
+                partition);
+
+        // Load existing fileID belongs to this task
+        Map<Integer, String> bucketToFileIDMap = new HashMap<>();
+        this.writeClient
+                .getHoodieTable()
+                .getHoodieView()
+                .getLatestFileSlices(partition)
+                .forEach(
+                        fileSlice -> {
+                            String fileId = fileSlice.getFileId();
+                            int bucketNumber = 
BucketIdentifier.bucketIdFromFileId(fileId);
+                            if (isBucketToLoad(bucketNumber, partition)) {
+                                LOG.info(
+                                        String.format(
+                                                "Should load this partition 
bucket %s with fileId %s",
+                                                bucketNumber, fileId));
+                                // Validate that one bucketId has only ONE 
fileId
+                                if 
(bucketToFileIDMap.containsKey(bucketNumber)) {
+                                    throw new RuntimeException(
+                                            String.format(
+                                                    "Duplicate fileId %s from 
bucket %s of partition %s found "
+                                                            + "during the 
BucketStreamWriteFunction index bootstrap.",
+                                                    fileId, bucketNumber, 
partition));
+                                } else {
+                                    LOG.info(
+                                            String.format(
+                                                    "Adding fileId %s to the 
bucket %s of partition %s.",
+                                                    fileId, bucketNumber, 
partition));
+                                    bucketToFileIDMap.put(bucketNumber, 
fileId);
+                                }
+                            }
+                        });
+        bucketAssignmentIndex.bootstrapPartition(partition, bucketToFileIDMap);
+    }
+
+    /** Calculate bucket from DataChangeEvent using primary key fields. */
+    private int calculateBucketFromEvent(DataChangeEvent dataChangeEvent) {
+        // Initialize cache on first call
+        if (cachedSchema == null) {
+            cachedSchema = 
recordSerializer.getSchema(dataChangeEvent.tableId());
+            if (cachedSchema == null) {
+                throw new IllegalStateException(
+                        "No schema available for table " + 
dataChangeEvent.tableId());
+            }
+
+            // Cache primary key fields
+            primaryKeyFields = cachedSchema.primaryKeys();
+            if (primaryKeyFields.isEmpty()) {
+                throw new IllegalStateException(
+                        "Cannot calculate bucket: table "
+                                + dataChangeEvent.tableId()
+                                + " has no primary keys");
+            }
+
+            // Cache field getters for primary key fields
+            primaryKeyFieldGetters = new ArrayList<>(primaryKeyFields.size());
+            for (String primaryKeyField : primaryKeyFields) {
+                int fieldIndex = 
cachedSchema.getColumnNames().indexOf(primaryKeyField);
+                if (fieldIndex == -1) {
+                    throw new IllegalStateException(
+                            "Primary key field '"
+                                    + primaryKeyField
+                                    + "' not found in schema for table "
+                                    + dataChangeEvent.tableId());
+                }
+                DataType fieldType = 
cachedSchema.getColumns().get(fieldIndex).getType();
+                
primaryKeyFieldGetters.add(RecordData.createFieldGetter(fieldType, fieldIndex));
+            }
+        }
+
+        // Extract record key from event data using cached field getters
+        String recordKey = extractRecordKeyFromEvent(dataChangeEvent);

Review Comment:
   record key can be get from `HoodieFlinkInternalRow` directly by calling 
`HoodieFlinkInternalRow#getRecordKey()`. So `extractRecordKeyFromEvent` is 
unnecessary, and `primaryKeyFieldGetters` can be removed.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/EventStreamWriteFunction.java:
##########
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.function;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.MappingIterator;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.buffer.RowDataBucket;
+import org.apache.hudi.sink.buffer.TotalSizeTracer;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
+import org.apache.hudi.sink.transform.RecordConverter;
+import org.apache.hudi.sink.utils.BufferUtils;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.MutableIteratorWrapperIterator;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/** Base infrastructures for streaming writer function to handle Events. */
+public abstract class EventStreamWriteFunction extends 
AbstractStreamWriteFunction<Event>
+        implements EventProcessorFunction {

Review Comment:
   Seems no need to implement `EventProcessorFunction`, actually, 
`processSchemaChange` and `processFlush` of `EventStreamWriteFunction` will 
never be called. 



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/EventStreamWriteFunction.java:
##########
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.function;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.MappingIterator;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.buffer.RowDataBucket;
+import org.apache.hudi.sink.buffer.TotalSizeTracer;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
+import org.apache.hudi.sink.transform.RecordConverter;
+import org.apache.hudi.sink.utils.BufferUtils;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.MutableIteratorWrapperIterator;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/** Base infrastructures for streaming writer function to handle Events. */
+public abstract class EventStreamWriteFunction extends 
AbstractStreamWriteFunction<Event>
+        implements EventProcessorFunction {

Review Comment:
   we should make minimal changes to `StreamWriteFunction` and 
`BucketStreamWriteFunction`, the generic type should kept as 
`HoodieFlinkInternalRow`. We can confine operations of `Event` within 
`MultiTableEventStreamWriteFunction` and `StreamWriteFunction` only need to 
provide the following operations:
   * flushRemaining(): called when flush event is received.
   * updateSchema()?: when shema change event is received, and need update 
inner schema or related fields, like index fields.
   * processData(HoodieFlinkInternalRow): DataChangeEvent should be converted 
to HoodieFlinkInternalRow in MultiTableEventStreamWriteFunction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to