voonhous commented on code in PR #4164:
URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2498353893


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java:
##########
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.function;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.HudiRecordEventSerializer;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
+import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent;
+import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
+import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ViewStorageProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Multi-table wrapper function that routes events to table-specific
+ * EventExtendedBucketStreamWriteFunction instances. This approach maintains 
table isolation by
+ * creating dedicated function instances per table while keeping the core 
write functions
+ * single-table focused.
+ */
+public class MultiTableEventStreamWriteFunction extends 
AbstractStreamWriteFunction<Event>
+        implements EventProcessorFunction {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MultiTableEventStreamWriteFunction.class);
+
+    /** Table-specific write functions created dynamically when new tables are 
encountered. */
+    private transient Map<TableId, ExtendedBucketStreamWriteFunction> 
tableFunctions;
+
+    /** Track tables that have been initialized to avoid duplicate 
initialization. */
+    private transient Map<TableId, Boolean> initializedTables;
+
+    /** Cache of schemas per table for RowType generation. */
+    private transient Map<TableId, Schema> schemaMaps;
+
+    /** Persistent state for schemas to survive checkpoints/savepoints. */
+    private transient ListState<Tuple2<TableId, Schema>> schemaState;
+
+    private transient Map<TableId, Configuration> tableConfigurations;
+
+    /** Schema evolution client to communicate with SchemaOperator. */
+    private transient SchemaEvolutionClient schemaEvolutionClient;
+
+    /** Serializer for converting Events to HoodieFlinkInternalRow. */
+    private transient HudiRecordEventSerializer recordSerializer;
+
+    /** Store the function initialization context for table functions. */
+    private transient FunctionInitializationContext 
functionInitializationContext;
+
+    public MultiTableEventStreamWriteFunction(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        this.functionInitializationContext = context;
+
+        // Initialize schema map before restoring state
+        if (this.schemaMaps == null) {
+            this.schemaMaps = new HashMap<>();
+        }
+
+        // Initialize schema state for persistence across 
checkpoints/savepoints
+        // Using operator state since this is not a keyed stream
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        TupleSerializer<Tuple2<TableId, Schema>> tupleSerializer =
+                new TupleSerializer(
+                        Tuple2.class,
+                        new TypeSerializer[] {
+                            TableIdSerializer.INSTANCE, 
SchemaSerializer.INSTANCE
+                        });
+        ListStateDescriptor<Tuple2<TableId, Schema>> schemaStateDescriptor =
+                new ListStateDescriptor<>("schemaState", tupleSerializer);
+        this.schemaState = 
context.getOperatorStateStore().getUnionListState(schemaStateDescriptor);
+
+        // Restore schemas from state if this is a restore operation
+        if (context.isRestored()) {
+            LOG.info("Restoring schemas from state");
+            for (Tuple2<TableId, Schema> entry : schemaState.get()) {
+                schemaMaps.put(entry.f0, entry.f1);
+                LOG.info("Restored schema for table: {}", entry.f0);
+            }
+            LOG.info("Restored {} schemas from state", schemaMaps.size());
+        }
+
+        LOG.info("MultiTableEventStreamWriteFunction state initialized");
+    }
+
+    /**
+     * Sets the SchemaEvolutionClient from the operator level since functions 
don't have direct
+     * access to TaskOperatorEventGateway.
+     */
+    public void setSchemaEvolutionClient(SchemaEvolutionClient 
schemaEvolutionClient) {
+        this.schemaEvolutionClient = schemaEvolutionClient;
+        LOG.info("SchemaEvolutionClient set for 
MultiTableEventStreamWriteFunction");
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        this.tableFunctions = new HashMap<>();
+        this.initializedTables = new HashMap<>();
+        // Don't reinitialize schemaMaps if it already has restored schemas 
from state
+        if (this.schemaMaps == null) {
+            this.schemaMaps = new HashMap<>();
+        }
+        this.tableConfigurations = new HashMap<>();
+        // Initialize record serializer (must be done in open() since it's 
transient)
+        this.recordSerializer = new 
HudiRecordEventSerializer(ZoneId.systemDefault());
+
+        // Restore schemas to recordSerializer if they were restored from state
+        // recordSerializer is transient and does not persist across restarts
+        if (!schemaMaps.isEmpty()) {
+            LOG.info("Restoring {} schemas to recordSerializer", 
schemaMaps.size());
+            for (Map.Entry<TableId, Schema> entry : schemaMaps.entrySet()) {
+                recordSerializer.setSchema(entry.getKey(), entry.getValue());
+                LOG.debug("Restored schema to recordSerializer for table: {}", 
entry.getKey());
+            }
+        }
+    }
+
+    @Override
+    public void processElement(Event event, Context ctx, Collector<RowData> 
out) throws Exception {
+        LOG.debug("Processing event of type: {}", 
event.getClass().getSimpleName());
+
+        // Route event to appropriate handler based on type
+        if (event instanceof DataChangeEvent) {
+            processDataChange((DataChangeEvent) event, ctx, out);
+        } else if (event instanceof SchemaChangeEvent) {
+            processSchemaChange((SchemaChangeEvent) event);
+        } else if (event instanceof FlushEvent) {
+            processFlush((FlushEvent) event);
+        } else {
+            LOG.warn("Received unknown event type: {}", 
event.getClass().getName());
+        }
+    }
+
+    /**
+     * Processes schema events. For a {@link CreateTableEvent}, it ensures 
that the coordinator is
+     * notified and the physical Hudi table is created. For a {@link 
SchemaChangeEvent}, it updates
+     * the local schema cache.
+     *
+     * <p>Implements {@link 
EventProcessorFunction#processSchemaChange(SchemaChangeEvent)}.
+     */
+    @Override
+    public void processSchemaChange(SchemaChangeEvent event) throws Exception {
+        TableId tableId = event.tableId();
+        try {
+            if (event instanceof CreateTableEvent) {
+                CreateTableEvent createTableEvent = (CreateTableEvent) event;
+                schemaMaps.put(tableId, createTableEvent.getSchema());
+                LOG.debug("Cached schema for new table: {}", tableId);
+
+                initializedTables.computeIfAbsent(
+                        tableId,
+                        tId -> {
+                            try {
+                                // Send an explicit event to the coordinator 
so it can prepare
+                                // resources *before* we attempt to write any 
data.
+                                getOperatorEventGateway()
+                                        .sendEventToCoordinator(
+                                                new 
CreateTableOperatorEvent(createTableEvent));
+                                LOG.info(
+                                        "Sent CreateTableOperatorEvent to 
coordinator for new table: {}",
+                                        tId);
+
+                                // Now, create the physical dir for Hudi table.
+                                Configuration tableConfig = 
createTableSpecificConfig(tId);
+                                createHudiTablePath(tableConfig, tId);
+                            } catch (Exception e) {
+                                // Re-throw to fail the Flink task if 
initialization fails.
+                                throw new RuntimeException(
+                                        "Failed during first-time 
initialization for table: " + tId,
+                                        e);
+                            }

Review Comment:
   ```
   initializedTables.computeIfAbsent(
         tableId,
         tId -> {
             return true;  // - This value gets inserted as 
initializedTables.put(tableId, true)
   });
   ```
   
   The computeIfAbsent will insert as true, no need to put again.
   
   Example:
   <img width="616" height="477" alt="image" 
src="https://github.com/user-attachments/assets/6e1b8bc8-ce94-4edc-a523-7caa26b477dc";
 />
   
   I have fixed the relevant calls and lifecycle management of this map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to