yihua commented on code in PR #18409: URL: https://github.com/apache/hudi/pull/18409#discussion_r3034719743
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * This class gives skeleton implementation for set of clustering execution strategy + * that use parquet-tools commands. + */ +public abstract class ParquetToolsExecutionStrategy<T extends HoodieRecordPayload<T>> + extends SingleSparkJobExecutionStrategy<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetToolsExecutionStrategy.class); + + public ParquetToolsExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, + ClusteringGroupInfo clusteringOps, + Map<String, String> strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + LOG.info("Starting clustering operation on input file ids."); + List<ClusteringOperation> clusteringOperations = clusteringOps.getOperations(); + if (clusteringOperations.size() > 1) { + throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); + } + + ClusteringOperation clusteringOperation = clusteringOperations.get(0); + String fileId = FSUtils.createNewFileIdPfx(); + String partitionPath = clusteringOperation.getPartitionPath(); + String dataFilePathStr = clusteringOperation.getDataFilePath(); + StoragePath oldFilePath = new StoragePath(dataFilePathStr); + HoodieFileWriteHandle writeHandler = new HoodieFileWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), + partitionPath, fileId, taskContextSupplier, oldFilePath); + + // Executes the parquet-tools command. + executeTools(new Path(oldFilePath.toUri()), new Path(writeHandler.getPath().toUri())); + return writeHandler.close(); + } Review Comment: 🤖 If `executeTools` throws or fails partway through (e.g. partial file copy), `writeHandler.close()` on line 83 will try to read the output file for metadata and throw a confusing secondary error. Have you considered wrapping this in a try-catch that cleans up the marker and partial output file on failure? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * This class gives skeleton implementation for set of clustering execution strategy + * that use parquet-tools commands. + */ +public abstract class ParquetToolsExecutionStrategy<T extends HoodieRecordPayload<T>> + extends SingleSparkJobExecutionStrategy<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetToolsExecutionStrategy.class); + + public ParquetToolsExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, + ClusteringGroupInfo clusteringOps, + Map<String, String> strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + LOG.info("Starting clustering operation on input file ids."); + List<ClusteringOperation> clusteringOperations = clusteringOps.getOperations(); + if (clusteringOperations.size() > 1) { + throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); + } + + ClusteringOperation clusteringOperation = clusteringOperations.get(0); + String fileId = FSUtils.createNewFileIdPfx(); + String partitionPath = clusteringOperation.getPartitionPath(); + String dataFilePathStr = clusteringOperation.getDataFilePath(); + StoragePath oldFilePath = new StoragePath(dataFilePathStr); + HoodieFileWriteHandle writeHandler = new HoodieFileWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), + partitionPath, fileId, taskContextSupplier, oldFilePath); + + // Executes the parquet-tools command. + executeTools(new Path(oldFilePath.toUri()), new Path(writeHandler.getPath().toUri())); + return writeHandler.close(); + } + + /** + * This method needs to be overridden by the child classes. + * In this method parquet-tools command can be created and executed. + * Assuming that the parquet-tools command operate per file basis this interface allows command to run once per file. Review Comment: 🤖 The `executeTools` signature uses Hadoop `Path` rather than Hudi's `StoragePath`. Since this PR is migrating to `StoragePath`/`HoodieStorage` APIs, it might be worth using `StoragePath` here too, so subclass implementations aren't forced to depend on Hadoop directly. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * This class gives skeleton implementation for set of clustering execution strategy + * that use parquet-tools commands. + */ +public abstract class ParquetToolsExecutionStrategy<T extends HoodieRecordPayload<T>> + extends SingleSparkJobExecutionStrategy<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetToolsExecutionStrategy.class); + + public ParquetToolsExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, + ClusteringGroupInfo clusteringOps, + Map<String, String> strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + LOG.info("Starting clustering operation on input file ids."); + List<ClusteringOperation> clusteringOperations = clusteringOps.getOperations(); + if (clusteringOperations.size() > 1) { + throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); + } + + ClusteringOperation clusteringOperation = clusteringOperations.get(0); + String fileId = FSUtils.createNewFileIdPfx(); + String partitionPath = clusteringOperation.getPartitionPath(); Review Comment: 🤖 This checks `> 1` but would let an empty list through, causing an `IndexOutOfBoundsException` at `get(0)` on line 76 instead of a meaningful error. Could you change this to `!= 1` to match the pattern used in `ClusteringIdentityTestExecutionStrategy`? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.PREV_COMMIT; +import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.TIME_TAKEN; + +/** + * Write handle that is used to work on top of files rather than on individual records. + */ +public class HoodieFileWriteHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieFileWriteHandle.class); + private final StoragePath path; + private final String prevCommit; + + public HoodieFileWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, + StoragePath oldFilePath) { + super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier, true); + + // Output file path. + this.path = makeNewPath(partitionPath); + // Get the prev commit from existing or old file. + this.prevCommit = FSUtils.getCommitTime(oldFilePath.getName()); + + // Create inProgress marker file + createMarkerFile(partitionPath, path.getName()); + LOG.info("New HoodieFileWriteHandle for partition :" + partitionPath + " with fileId " + fileId); + } + + /** + * Complete writing of the file by creating the success marker file. + * @return WriteStatuses, ideally it will be only one object. + */ + @Override + public List<WriteStatus> close() { + LOG.info("Closing the file " + this.fileId + " as we are done with the file."); + try { + Map<String, Object> executionConfigs = new HashMap<>(); Review Comment: 🤖 The `close()` method can be called even if no file was actually written to `this.path` (e.g. if `executeTools` was never called). In that case, `getPathInfo` and `getRowCount` would fail. Is it worth adding a guard to check the file exists before attempting to generate write status? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
