voonhous commented on code in PR #4164: URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2498831667
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java: ########## @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.hudi.sink.coordinator; + +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent; +import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent; +import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent; +import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SerializationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.event.Correspondent; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.CoordinationResponseSerDe; +import org.apache.hudi.sink.utils.EventBuffers; +import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory; +import org.apache.hudi.sink.utils.NonThrownExecutor; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.ClusteringUtil; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.StreamerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.configuration.FlinkOptions.COMPACTION_DELTA_COMMITS; + +/** + * A custom OperatorCoordinator that manages Hudi writes for multiple tables. + * + * <p>This coordinator extends the default {@link StreamWriteOperatorCoordinator}. The parent class + * is designed for a single destination table, so its core logic (e.g., for commits and + * checkpointing) cannot be reused directly for a multi-table sink. + * + * <p>Therefore, this implementation overrides the essential lifecycle methods to manage a + * collection of per-table resources. It dynamically creates and manages a dedicated {@link + * HoodieFlinkWriteClient}, {@link EventBuffers}, and timeline for each table that appears in the + * upstream CDC data. + */ +public class MultiTableStreamWriteOperatorCoordinator extends StreamWriteOperatorCoordinator { + + private static final Logger LOG = + LoggerFactory.getLogger(MultiTableStreamWriteOperatorCoordinator.class); + + /** + * A custom coordination request that includes the TableId to request an instant for a specific + * table. + */ + public static class MultiTableInstantTimeRequest implements CoordinationRequest, Serializable { + private static final long serialVersionUID = 1L; + private final long checkpointId; + private final TableId tableId; + + public MultiTableInstantTimeRequest(long checkpointId, TableId tableId) { + this.checkpointId = checkpointId; + this.tableId = tableId; + } + + public long getCheckpointId() { + return checkpointId; + } + + public TableId getTableId() { + return tableId; + } + } + + /** + * Encapsulates all state and resources for a single table. This simplifies management by + * grouping related objects, making the coordinator logic cleaner and less prone to errors. + */ + private static class TableContext implements Serializable { + private static final long serialVersionUID = 1L; + + final transient HoodieFlinkWriteClient<?> writeClient; + final EventBuffers eventBuffers; + final TableState tableState; + final String tablePath; + + TableContext( + HoodieFlinkWriteClient<?> writeClient, + EventBuffers eventBuffers, + TableState tableState, + String tablePath) { + this.writeClient = writeClient; + this.eventBuffers = eventBuffers; + this.tableState = tableState; + this.tablePath = tablePath; + } + + void close() { + if (writeClient != null) { + try { + writeClient.close(); + } catch (Exception e) { + LOG.error("Error closing write client for table path: {}", tablePath, e); + } + } + } + } + + /** A container for table-specific configuration and state. */ + private static class TableState implements Serializable { + private static final long serialVersionUID = 1L; + final String commitAction; + final boolean isOverwrite; + final WriteOperationType operationType; + final boolean scheduleCompaction; + final boolean scheduleClustering; + final boolean isDeltaTimeCompaction; + + // Event-driven compaction tracking - tracks actual write activity + long commitsSinceLastCompaction = 0; + // For MOR tables, track log file growth + long totalLogBytesWritten = 0; + + final int commitsThreshold; + + TableState(Configuration conf) { + this.operationType = + WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + this.commitAction = + CommitUtils.getCommitActionType( + this.operationType, + HoodieTableType.valueOf( + conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase())); + this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); + this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf); + this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf); + this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf); + this.commitsThreshold = conf.get(COMPACTION_DELTA_COMMITS); + } + + /** + * Updates compaction metrics based on write statuses. Skips empty commits where no actual + * data was written. + * + * @param writeStatuses The write statuses from the latest commit + * @return true if this commit had actual writes, false if it was empty + */ + boolean updateCompactionMetrics(List<WriteStatus> writeStatuses) { + if (writeStatuses == null || writeStatuses.isEmpty()) { + LOG.debug("No write statuses - skipping compaction metric update"); + return false; + } + + // Check if any actual writes occurred (skip empty commits) + long totalWrites = + writeStatuses.stream() + .map(WriteStatus::getStat) + .filter(stat -> stat != null) + .mapToLong(HoodieWriteStat::getNumWrites) + .sum(); + + if (totalWrites == 0) { + LOG.debug( + "Empty commit detected (numWrites=0) - skipping compaction metric update"); + return false; + } + + // Track log file bytes written (for MOR tables) + long bytesWritten = + writeStatuses.stream() + .map(WriteStatus::getStat) + .filter(stat -> stat != null) + .mapToLong(HoodieWriteStat::getTotalWriteBytes) + .sum(); + + commitsSinceLastCompaction++; + totalLogBytesWritten += bytesWritten; + + LOG.debug( + "Updated compaction metrics: commits={}, bytes={}", + commitsSinceLastCompaction, + totalLogBytesWritten); + return true; + } + + /** Resets compaction metrics after compaction is scheduled. */ + void resetCompactionMetrics() { + commitsSinceLastCompaction = 0; + totalLogBytesWritten = 0; + } + + /** + * Determines if compaction should be triggered based on write activity. Only triggers for + * MOR tables with actual data writes. + * + * @return true if compaction should be scheduled + */ + boolean shouldTriggerCompaction() { + // Only trigger for MOR tables (DELTA_COMMIT means log files) + if (!commitAction.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + return false; + } + + return commitsSinceLastCompaction >= commitsThreshold; + } + } + + /** The base Flink configuration. */ + private final Configuration baseConfig; + + /** + * A single, unified map holding the context for each managed table. The key is the {@link + * TableId}, providing a centralized place for all per-table resources. + */ + private final Map<TableId, TableContext> tableContexts = new ConcurrentHashMap<>(); + + /** A reverse lookup map from table path to TableId for efficient event routing. */ + private final Map<String, TableId> pathToTableId = new ConcurrentHashMap<>(); + + /** Cache of schemas per table for config creation. */ + private final Map<TableId, Schema> tableSchemas = new ConcurrentHashMap<>(); + + /** + * Gateways for sending events to sub-tasks. This field is necessary because the parent's + * `gateways` array is private and not initialized if we don't call super.start(). + */ + private transient SubtaskGateway[] gateways; + + /** + * A dedicated write client whose only job is to run the embedded timeline server. This ensures + * there is only one timeline server for the entire job. + */ + private transient HoodieFlinkWriteClient<?> timelineServerClient; + + /** A single-thread executor to handle instant time requests, mimicking the parent behavior. */ + private transient NonThrownExecutor instantRequestExecutor; + + public MultiTableStreamWriteOperatorCoordinator(Configuration conf, Context context) { + super(conf, context); + conf.setString("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + this.baseConfig = conf; + LOG.info( + "MultiTableStreamWriteOperatorCoordinator initialized for operator: {} with config: {}", + context.getOperatorId(), + baseConfig); + } + + @Override + public void start() throws Exception { + // Hadoop's FileSystem API uses Java's ServiceLoader to find implementations for + // URI schemes (like 'file://'). The ServiceLoader relies on the thread's context + // classloader. The parent class sets this, but our overridden start() method must + // do so as well to ensure file system implementations can be found. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + // Initialize the executor service, which is a protected field in the parent class. + // This logic is borrowed from the parent's start() method as we cannot call super.start(). + this.executor = + NonThrownExecutor.builder(LOG) + .threadFactory( + new ExplicitClassloaderThreadFactory( + "multi-table-coord-event-handler", + context.getUserCodeClassloader())) + .exceptionHook( + (errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t))) + .waitForTasksFinish(true) + .build(); + + // Executor for handling instant requests. + this.instantRequestExecutor = + NonThrownExecutor.builder(LOG) + .threadFactory( + new ExplicitClassloaderThreadFactory( + "multi-table-instant-request", + context.getUserCodeClassloader())) + .exceptionHook( + (errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t))) + .build(); + + // Initialize the gateways array to avoid NullPointerException when subtasks are ready. + this.gateways = new SubtaskGateway[context.currentParallelism()]; + + // Initialize a single write client for the coordinator path. + // Its primary role is to start and manage the embedded timeline server. + try { + // The baseConfig points to the dummy coordinator path. + // A .hoodie directory is required for the timeline server to start. + StreamerUtil.initTableIfNotExists(this.baseConfig); + this.timelineServerClient = FlinkWriteClients.createWriteClient(this.baseConfig); Review Comment: This is the "coordinator" writeclient, it will start an embedded timeline server and all the other tables will use this "coordintor"'s timeline server for FileSystemView requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
