aokolnychyi commented on a change in pull request #2984:
URL: https://github.com/apache/iceberg/pull/2984#discussion_r690519631
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -72,12 +62,7 @@ public BaseStatistics getStatistics(BaseStatistics
cachedStatistics) {
@Override
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
- // Called in Job manager, so it is OK to load table from catalog.
- tableLoader.open();
Review comment:
Okay, I've submitted #2987 for Flink changes alone.
##########
File path:
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
##########
@@ -141,7 +144,7 @@
// TODO: We do not support residual evaluation for HIVE and PIG in
memory data model yet
checkResiduals(task);
}
- splits.add(new IcebergSplit(conf, task, table.io(),
table.encryption()));
+ splits.add(new IcebergSplit(SerializableTable.copyOf(table), conf,
task));
Review comment:
Good catch, @kbendick! Moved it outside of the loop in #2988.
##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -38,6 +38,11 @@ private MetadataColumns() {
Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position
of a row in the source data file");
public static final NestedField IS_DELETED = NestedField.required(
Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the
row has been deleted");
+ public static final NestedField SPEC = NestedField.required(
+ Integer.MAX_VALUE - 4, "_spec", Types.IntegerType.get(), "Spec ID to
which a row belongs to");
+ public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
+ public static final String PARTITION_COLUMN_NAME = "_partition";
+ public static final String PARTITION_COLUMN_DOC = "Partition to which a row
belongs to";
Review comment:
Sure, I'll add a comment above.
##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -49,26 +54,40 @@ private MetadataColumns() {
public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
+ SPEC.name(), SPEC,
FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION,
IS_DELETED.name(), IS_DELETED);
- private static final Set<Integer> META_IDS =
META_COLUMNS.values().stream().map(NestedField::fieldId)
- .collect(ImmutableSet.toImmutableSet());
+ private static final Set<Integer> META_IDS = ImmutableSet.of(
+ PARTITION_COLUMN_ID,
+ SPEC.fieldId(),
+ FILE_PATH.fieldId(),
+ ROW_POSITION.fieldId(),
+ IS_DELETED.fieldId()
+ );
public static Set<Integer> metadataFieldIds() {
return META_IDS;
}
- public static NestedField get(String name) {
- return META_COLUMNS.get(name);
+ public static NestedField metadataColumn(Table table, String name) {
+ if (name.equals(PARTITION_COLUMN_NAME)) {
+ return Types.NestedField.optional(
+ PARTITION_COLUMN_ID,
+ PARTITION_COLUMN_NAME,
+ Partitioning.partitionType(table),
+ PARTITION_COLUMN_DOC);
+ } else {
+ return META_COLUMNS.get(name);
+ }
}
public static boolean isMetadataColumn(String name) {
- return META_COLUMNS.containsKey(name);
+ return name.equals(PARTITION_COLUMN_NAME) ||
META_COLUMNS.containsKey(name);
Review comment:
I did that first but `META_COLUMNS` is an immutable map that does not
allow null keys. I am reluctant to switch to a mutable map so I added this
condition here. I hope we will be able to use `metadataColumn(table, name)` in
other places so this workaround will be only part of `MetadataColumns`.
It looks ugly, though, I agree.
##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -49,26 +54,40 @@ private MetadataColumns() {
public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
+ SPEC.name(), SPEC,
Review comment:
I'll switch. Wanted to reduce the number of modified lines but probably
better to follow the order of definition.
##########
File path: core/src/main/java/org/apache/iceberg/Partitioning.java
##########
@@ -246,4 +246,45 @@ private static boolean
equivalentIgnoringNames(PartitionField field, PartitionFi
private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?,
?> t2) {
return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) ||
t2.equals(Transforms.alwaysNull());
}
+
+ /**
+ * Adapts the provided partition data to match the table partition type
built using {@link #partitionType(Table)}.
+ *
+ * @param partitionType a table partition type that includes partition
fields from all specs
Review comment:
Yeah, it will be set to null as all partition fields are optional and
the table partition type is a union of all spec types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]