RussellSpitzer commented on code in PR #4560:
URL: https://github.com/apache/iceberg/pull/4560#discussion_r856568138
##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -93,16 +93,71 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
return StaticDataTask.Row.of(partition.key, partition.recordCount,
partition.fileCount, partition.specId);
}
- private static Iterable<Partition> partitions(StaticTableScan scan) {
+ private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
CloseableIterable<FileScanTask> tasks = planFiles(scan);
+ Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
+ PartitionMap partitions = new PartitionMap();
+
+ // cache a position map needed by each partition spec to normalize
partitions to final schema
+ Map<Integer, Integer[]> originalPartitionFieldPositionsBySpec =
+ Maps.newHashMapWithExpectedSize(table.specs().size());
- PartitionMap partitions = new
PartitionMap(scan.table().spec().partitionType());
for (FileScanTask task : tasks) {
- partitions.get(task.file().partition()).update(task.file());
+ PartitionData original = (PartitionData) task.file().partition();
+ Integer[] originalPositions =
originalPartitionFieldPositionsBySpec.computeIfAbsent(
+ task.spec().specId(), specId -> originalPositions(table, specId,
normalizedPartitionType));
+ PartitionData normalized = normalizePartition(original,
normalizedPartitionType, originalPositions);
+ partitions.get(normalized).update(task.file());
}
return partitions.all();
}
+ /**
+ * Returns an array of original partition field positions, indexed by
normalized partition field positions
+ */
+ private static Integer[] originalPositions(Table table,
+ int originalSpecId,
+ Types.StructType
normalizedPartitionType) {
+ Types.StructType originalType =
table.specs().get(originalSpecId).partitionType();
+
+ Map<Integer, Integer> originalFieldIdsToPosition =
Maps.newHashMapWithExpectedSize(
+ originalType.fields().size());
+ int originalPartitionIndex = 0;
+ for (Types.NestedField originalField : originalType.fields()) {
+ originalFieldIdsToPosition.put(originalField.fieldId(),
originalPartitionIndex);
+ originalPartitionIndex++;
+ }
+
+ return normalizedPartitionType.fields().stream()
+ .map(f -> originalFieldIdsToPosition.get(f.fieldId()))
+ .toArray(Integer[]::new);
+ }
+
+ /**
+ * Convert a partition data written by an old spec, to table's normalized
partition type, which is a common partition
+ * type for all specs of the table.
+ * @param originalPartition un-normalized partition data
+ * @param normalizedPartitionType table's normalized partition type {@link
Partitioning#partitionType(Table)}
+ * @param originalPartitionFieldPositions an array of positional indexes of
the spec's partition fields indexed by
+ * position in the normalized
partition schema
+ * @return the normalized partition data
+ */
+ private static PartitionData normalizePartition(PartitionData
originalPartition,
+ Types.StructType
normalizedPartitionType,
+ Integer[]
originalPartitionFieldPositions) {
+ PartitionData normalizedPartition = new
PartitionData(normalizedPartitionType);
+
+ IntStream.range(0,
normalizedPartitionType.fields().size()).forEach(normalizedPartitionFieldIndex
-> {
Review Comment:
I would just use a normal for loop here, the Intstream I feel like is more
verbose that is useful. I wish there was a zipWithIndex or Enumerate but java
is still not ready for concise versions of those :(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]