danny0405 commented on code in PR #12537:
URL: https://github.com/apache/hudi/pull/12537#discussion_r1909691146
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -67,4 +85,69 @@ protected HoodieEngineContext getEngineContext() {
protected HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}
+
+ protected ClosableIterator<HoodieRecord<T>>
getRecordIteratorWithLogFiles(ClusteringOperation operation, String
instantTime, long maxMemory) {
+ HoodieWriteConfig config = getWriteConfig();
+ HoodieTable table = getHoodieTable();
+ StorageConfiguration<?> storageConf = table.getStorageConf();
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
+ Option<String[]> partitionFields = tableConfig.getPartitionFields();
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withStorage(table.getStorage())
+ .withBasePath(table.getMetaClient().getBasePath())
+ .withLogFilePaths(operation.getDeltaFilePaths())
+ .withReaderSchema(readerSchemaWithMetaFields)
+ .withLatestInstantTime(instantTime)
+ .withMaxMemorySizeInBytes(maxMemory)
+ .withReverseReader(config.getCompactionReverseLogReadEnabled())
+ .withBufferSize(config.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(config.getSpillableMapBasePath())
+ .withPartition(operation.getPartitionPath())
+ .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
+ .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+ .withRecordMerger(config.getRecordMerger())
+ .withTableMetaClient(table.getMetaClient())
+ .build();
+
+ Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(operation.getDataFilePath())
+ ? Option.empty()
+ : Option.of(getBaseOrBootstrapFileReader(storageConf,
bootstrapBasePath, partitionFields, operation));
+ Option<BaseKeyGenerator> keyGeneratorOp = getKeyGenerator();
+ try {
+ return new HoodieFileSliceReader(baseFileReader, scanner,
readerSchemaWithMetaFields, tableConfig.getPreCombineField(),
config.getRecordMerger(),
+ tableConfig.getProps(),
+ tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp())), keyGeneratorOp);
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading file slices", e);
+ }
+ }
+
+ protected ClosableIterator<HoodieRecord<T>>
getRecordIteratorWithBaseFileOnly(ClusteringOperation operation) {
+ StorageConfiguration<?> storageConf = getHoodieTable().getStorageConf();
+ HoodieTableConfig tableConfig =
getHoodieTable().getMetaClient().getTableConfig();
+ String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
+ Option<String[]> partitionFields = tableConfig.getPartitionFields();
+ HoodieFileReader baseFileReader =
getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields,
operation);
+
+ Option<BaseKeyGenerator> keyGeneratorOp = getKeyGenerator();
+ // NOTE: Record have to be cloned here to make sure if it holds low-level
engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
+ // it since these records will be shuffled later.
+ ClosableIterator<HoodieRecord> baseRecordsIterator;
+ try {
+ baseRecordsIterator =
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading base file", e);
+ }
+ return new CloseableMappingIterator(
+ baseRecordsIterator,
+ rec -> ((HoodieRecord)
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields,
writeConfig.getProps(), keyGeneratorOp));
+ }
+
+ protected abstract Option<BaseKeyGenerator> getKeyGenerator();
Review Comment:
Let's remove the interface for `getKeyGenerator` and
`getBaseOrBootstrapFileReader`, they are just utilities.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]