nsivabalan commented on code in PR #14031:
URL: https://github.com/apache/hudi/pull/14031#discussion_r2401202570
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1552,13 +1565,27 @@ private static FileSlice
mergeCompactionPendingFileSlices(FileSlice lastSlice, F
*
* @param fileGroup File Group for which the file slice belongs to
* @param fileSlice File Slice which needs to be merged
- */
- private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup, FileSlice
fileSlice) {
+ * @param currentInstantTime Instant time of the current transaction
+ * @param includeBaseFileUnderInflightCompaction whether to include base
file under inflight compaction
+ * when merging file slices
+ */
+ private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup,
+ FileSlice fileSlice,
+ String currentInstantTime,
+ boolean
includeBaseFileUnderInflightCompaction) {
// if the file-group is under construction, pick the latest before
compaction instant time.
Option<Pair<String, CompactionOperation>> compactionOpWithInstant =
getPendingCompactionOperationWithInstant(fileGroup.getFileGroupId());
if (compactionOpWithInstant.isPresent()) {
String compactionInstantTime = compactionOpWithInstant.get().getKey();
+ // Only if includeBaseFileUnderInflightCompaction is true, i.e., the
base file from inflight
+ // compaction should be considered in the latest merged file slice, we
check the file slice.
Review Comment:
can you help me understand the need here.
does the base file refer to last but one file slice's base file? or the
phantom base file?
if its former, doesn't typical `getLatestMergedFileSlice` already return
this.
if its later, why do we need to return a base file which is not even created
yet?
can you help throw some light please
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java:
##########
@@ -114,20 +124,60 @@ public static <T> HoodieData<HoodieRecord>
convertWriteStatsToSecondaryIndexReco
String fileId = writeStatsByFileIdEntry.getKey();
List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
String partition = writeStats.get(0).getPartitionPath();
- FileSlice previousFileSliceForFileId =
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+ StoragePath basePath = dataMetaClient.getBasePath();
+
+ // validate that for a given fileId, either we have 1 parquet file or N
log files.
+ AtomicInteger totalParquetFiles = new AtomicInteger();
+ AtomicInteger totalLogFiles = new AtomicInteger();
+ writeStats.stream().forEach(writeStat -> {
+ if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath())))
{
+ totalLogFiles.getAndIncrement();
+ } else {
+ totalParquetFiles.getAndIncrement();
+ }
+ });
+
+ ValidationUtils.checkArgument(!(totalParquetFiles.get() > 0 &&
totalLogFiles.get() > 0), "Only either of base file or log files are expected
for a given file group. "
+ + "Partition " + partition + ", fileId " + fileId);
+ if (totalParquetFiles.get() > 0) {
+ // we should expect only 1 parquet file
+ ValidationUtils.checkArgument(writeStats.size() == 1, "Only one new
parquet file expected per file group per commit");
+ }
+ // Instantiate Remote table FSV
+ TableFileSystemView.SliceView sliceView = getSliceView(writeConfig,
dataMetaClient);
+ Option<FileSlice> fileSliceOption =
sliceView.getLatestMergedFileSliceBeforeOrOn(partition, instantTime, fileId);
Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
- if (previousFileSliceForFileId == null) {
- // new file slice, so empty mapping for previous slice
- recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
- } else {
+ Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice;
+ if (fileSliceOption.isPresent()) { // if previous file slice is present.
recordKeyToSecondaryKeyForPreviousFileSlice =
- getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), previousFileSliceForFileId, tableSchema,
indexDefinition, instantTime, props, false);
+ getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), fileSliceOption.get(), tableSchema,
indexDefinition, instantTime, props, false);
Review Comment:
oh, looks like I also pushed my change here by mistake.
I have a separate patch for secondary index here
https://github.com/apache/hudi/pull/14045/
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -763,6 +763,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.write.merge.handle.class",
"org.apache.hudi.io.FileGroupReaderBasedMergeHandle")
+ .option("hoodie.index.type","SIMPLE")
+ .option("hoodie.metadata.enable","false")
Review Comment:
to be cleaned up
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -166,6 +166,24 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file slices before on an instant time including
files under inflight instants.
Review Comment:
minor.
`Stream all latest merged .."
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1043,22 +1043,35 @@ public final Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(Str
@Override
public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionStr, String maxInstantTime) {
+ return getLatestMergedFileSliceBeforeOrOnInternal(partitionStr,
maxInstantTime, maxInstantTime, false);
+ }
+
+ @Override
+ public final Stream<FileSlice>
getLatestMergedFileSlicesBeforeOrOnIncludingInflight(String partitionStr,
String maxInstantTime, String currentInstantTime) {
Review Comment:
do we have UTs for these
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java:
##########
@@ -374,6 +374,7 @@ public boolean delete(Path f, boolean recursive) throws
IOException {
@Override
public FileStatus[] listStatus(Path f) throws IOException {
+ System.out.println("!!!!!!! listStatus for " + f);
Review Comment:
all these need to be cleaned up
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java:
##########
@@ -856,4 +856,260 @@ private static Stream<Arguments>
mapKeyNoSeparatorToFileGroupIndexTestCases() {
)
);
}
+
+ /*
+ @Disabled
+ @Test
+ public void testConvertMetadataToPartitionStatRecordsWithoutColumnStats()
throws Exception {
Review Comment:
why disabled?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -166,6 +166,24 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file slices before on an instant time including
files under inflight instants.
Review Comment:
`before or on ...`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -166,6 +166,24 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file slices before on an instant time including
files under inflight instants.
+ * If a filegroup has a pending compaction request,
+ * (1) if the base file from compaction is not present, the file slice
before and after
+ * compaction request instant is merged and returned;
+ * (2) if the base file from compaction is present, and the compaction is
inflight, only if the
Review Comment:
now I get the rational for the previous fix.
So, essentially, if current client is actually the compaction, and is
polling `getLatestMergedFileSlice including inflight` , we should return the
base file created by inflght compaction and ignore entire previous file slice
(prev base file + all logs from previous slice).
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java:
##########
@@ -194,6 +200,9 @@ public List<StoragePathInfo> listFiles(StoragePath path)
throws IOException {
@Override
public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList)
throws IOException {
+ if (pathList.stream().anyMatch(path ->
path.getParent().getName().equals("test_table"))) {
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]