voonhous commented on code in PR #18532:
URL: https://github.com/apache/hudi/pull/18532#discussion_r3409020514


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -479,31 +479,29 @@ public static List<HoodieRecord> 
convertMetadataToFilesPartitionRecords(HoodieCo
               String partitionStatName = entry.getKey();
               List<HoodieWriteStat> writeStats = entry.getValue();
 
-              HashMap<String, Long> updatedFilesToSizesMapping =
-                  writeStats.stream().reduce(new HashMap<>(writeStats.size()),
-                      (map, stat) -> {
-                        String pathWithPartition = stat.getPath();
-                        if (pathWithPartition == null) {
-                          // Empty partition
-                          log.warn("Unable to find path in write stat to 
update metadata table {}", stat);
-                          return map;
-                        }
-
-                        String fileName = 
FSUtils.getFileName(pathWithPartition, partitionStatName);
-
-                        // Since write-stats are coming in no particular 
order, if the same
-                        // file have previously been appended to w/in the txn, 
we simply pick max
-                        // of the sizes as reported after every write, since 
file-sizes are
-                        // monotonically increasing (ie file-size never goes 
down, unless deleted)
-                        map.merge(fileName, stat.getFileSizeInBytes(), 
Math::max);
-
-                        Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
-                        if (cdcPathAndSizes != null && 
!cdcPathAndSizes.isEmpty()) {
-                          cdcPathAndSizes.forEach((key, value) -> 
map.put(FSUtils.getFileName(key, partitionStatName), value));
-                        }
-                        return map;
-                      },
-                      CollectionUtils::combine);
+              HashMap<String, Long> updatedFilesToSizesMapping = new 
HashMap<>(writeStats.size());

Review Comment:
   Good question. I looked at `toMap` but it doesn't fit this aggregation 
cleanly, for three reasons:
   
   1. Each `HoodieWriteStat` contributes a *variable* number of entries: one 
entry mapping the data file name to `getFileSizeInBytes()`, plus zero-to-N CDC 
entries from `getCdcStats()`. `toMap` emits exactly one entry per stream 
element, so this would first need a `flatMap` into `Map.entry(...)`.
   
   2. The two groups need *different* collision policies in the same map: data 
files take `Math::max`, CDC entries take last-wins (`put`). `toMap` only 
accepts a single merge function. For example, with `statA{f1=100, c1=10}` and 
`statB{f1=80, c1=5}`, the current code yields `f1=100, c1=5`; neither a single 
`Math::max` merge nor a single last-wins merge reproduces that.
   
   3. The null-path case logs a warning and skips. In a stream that would have 
to live in a side-effecting `filter`/`peek`, i.e. the same kind of stream 
misuse this PR is removing.
   
   So I kept the plain for-loop, which expresses all three directly. Happy to 
revisit if you see a clean `toMap` form I'm missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to