ayushtkn commented on code in PR #6565:
URL: https://github.com/apache/hive/pull/6565#discussion_r3502739534
##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java:
##########
@@ -154,6 +154,23 @@ public static Type convert(TypeInfo typeInfo, String
defaultValue) {
return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
}
+ public static Type applyInitialDefaultsToStruct(Type type) {
+ Types.StructType struct = type.asStructType();
+ return Types.StructType.of(
+
struct.fields().stream().map(HiveSchemaUtil::applyInitialDefaultsToStructField).toList());
+ }
+
+ private static Types.NestedField
applyInitialDefaultsToStructField(Types.NestedField field) {
+ Types.NestedField.Builder builder = Types.NestedField.from(field);
+ if (field.type().isStructType()) {
+ builder.ofType(applyInitialDefaultsToStruct(field.type()));
+ }
+ if (field.initialDefaultLiteral() == null && field.writeDefaultLiteral()
!= null) {
+ builder.withInitialDefault(field.writeDefaultLiteral());
+ }
Review Comment:
why are we using `writeDefault` when `initialDefault` isn't there?
##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java:
##########
@@ -429,6 +446,24 @@ public static void setDefaultValues(Record record,
List<Types.NestedField> missi
}
}
+ /**
+ * Backfills struct column that is null on read using nested {@code
initialDefault} metadata.
+ * This applies to rows written before {@code ADD COLUMNS} added the struct.
+ * Spec allows struct defaults as {@code {}} (see
https://iceberg.apache.org/spec/#default-values), but
+ * {@code UpdateSchema} add column only supports primitives today;
+ * if empty structs are allowed, this backfill can be removed.
+ */
+ public static void backfillStructInitialDefaults(Record iceRecord,
List<Types.NestedField> columns) {
+ for (Types.NestedField field : columns) {
+ if (field.type().isStructType() && iceRecord.getField(field.name()) ==
null) {
Review Comment:
can we use fieldId here? I am doubting the case where there was a field with
same name which was dropped earlier and then added again as struct.
##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java:
##########
@@ -458,6 +493,42 @@ private static Record
buildStructWithDefaults(Types.StructType structType) {
return hasAnyDefault ? nestedRecord : null;
}
+ private static Record buildStructWithInitialDefaults(Types.StructType
structType) {
Review Comment:
this is identical to `buildStructWithDefaults` just the `intialDefault` vs
`writeDefault`, refactor!!!!
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java:
##########
@@ -744,12 +744,14 @@ private void
handleAddColumns(org.apache.hadoop.hive.metastore.api.Table hmsTabl
boolean isORc = isOrcFileFormat(hmsTable);
for (FieldSchema addedCol : addedCols) {
String defaultValue = defaultValues.get(addedCol.getName());
- Type type =
HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()),
defaultValue);
- Literal<Object> defaultVal = Optional.ofNullable(defaultValue).filter(v
-> !type.isStructType())
- .map(v -> Expressions.lit(HiveSchemaUtil.getDefaultValue(v,
type))).orElse(null);
-
+ Type baseType =
HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()),
defaultValue);
+ final Type resolvedType = (!isORc && baseType.isStructType()) ?
+ HiveSchemaUtil.applyInitialDefaultsToStruct(baseType) :
+ baseType;
+ Literal<Object> defaultVal = Optional.ofNullable(defaultValue).filter(v
-> !resolvedType.isStructType())
+ .map(v -> Expressions.lit(HiveSchemaUtil.getDefaultValue(v,
resolvedType))).orElse(null);
// ORC doesn't have support for initialDefault from iceberg layer, we
only need to set default for writeDefault.
- updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(),
isORc ? null : defaultVal);
+ updateSchema.addColumn(addedCol.getName(), resolvedType,
addedCol.getComment(), isORc ? null : defaultVal);
Review Comment:
I am not catching what are u trying to achieve here? This used to have the
`default`
```
Type type =
HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()),
defaultValue);
```
Maybe just adding `initialDefault` along with `writeDefault` should have
solved the problem.
As of now it only sets `writeDefault` while converting the schema
```
Type type =
HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()),
defaultValue);
```
You are doing parsing and all multiple times
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java:
##########
@@ -64,6 +68,41 @@ public void readBatch(int total, ColumnVector col, TypeInfo
typeInfo) throws IOE
if (typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
fillPrimitive(col, (PrimitiveTypeInfo) typeInfo, defaultValue);
+ } else if (typeInfo.getCategory() == ObjectInspector.Category.STRUCT) {
+ fillStruct(col, (StructTypeInfo) typeInfo, defaultValue);
+ } else {
+ throw new IOException("Unsupported type category in DummyColumnReader: "
+ typeInfo.getCategory());
+ }
+ }
+
+ private void fillStruct(ColumnVector col, StructTypeInfo structTypeInfo,
Object defaultValue) throws IOException {
+ StructColumnVector structCol = (StructColumnVector) col;
+ List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+ List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
+ Map<String, Object> fieldDefaults = defaultValue instanceof Map ?
(Map<String, Object>) defaultValue : null;
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ Object fieldDefault = fieldDefaults != null ?
fieldDefaults.get(fieldNames.get(i)) : null;
+ fillStructField(structCol.fields[i], fieldTypes.get(i), fieldDefault);
+ }
+ }
+
+ private void fillStructField(ColumnVector col, TypeInfo typeInfo, Object
fieldDefault) throws IOException {
+ if (fieldDefault == null) {
+ col.isRepeating = true;
+ Arrays.fill(col.isNull, true);
+ col.noNulls = false;
+ return;
+ }
+
+ col.isRepeating = true;
+ col.noNulls = true;
+ col.isNull[0] = false;
+
+ if (typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ fillPrimitive(col, (PrimitiveTypeInfo) typeInfo, fieldDefault);
+ } else if (typeInfo.getCategory() == ObjectInspector.Category.STRUCT) {
+ fillStruct(col, (StructTypeInfo) typeInfo, fieldDefault);
Review Comment:
this is getting duped with the above `// Case 2: We have a default → fill
with constant value` refactor rather than duplicating
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java:
##########
@@ -173,7 +174,22 @@ private CloseableIterable openGeneric(FileScanTask task,
Schema readSchema) {
default -> throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(),
file.location()));
};
- return applyResidualFiltering(iterable, residual, readSchema);
+ return applyResidualFiltering(withStructInitialDefaultBackfill(iterable,
readSchema), residual, readSchema);
+ }
+
+ private CloseableIterable<T>
withStructInitialDefaultBackfill(CloseableIterable<T> iterable, Schema
readSchema) {
+ boolean needsBackfill = readSchema.columns().stream()
+ .filter(field -> field.type().isStructType())
+ .anyMatch(field ->
!HiveSchemaUtil.getStructInitialDefaults(field.type().asStructType()).isEmpty());
+ if (!needsBackfill) {
+ return iterable;
+ }
+ return CloseableIterable.transform(iterable, row -> {
+ if (row instanceof Record curIceRecord) {
+ HiveSchemaUtil.backfillStructInitialDefaults(curIceRecord,
readSchema.columns());
+ }
+ return row;
+ });
Review Comment:
How many times are we walking over the schema here? This is pretty
confusing, not sure what you are trying to achieve here.
For `needsBackfill` you go through the schema, call
`HiveSchemaUtil.getStructInitialDefaults` populate the enitre map & then throw
it?
##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java:
##########
@@ -429,6 +446,24 @@ public static void setDefaultValues(Record record,
List<Types.NestedField> missi
}
}
+ /**
+ * Backfills struct column that is null on read using nested {@code
initialDefault} metadata.
+ * This applies to rows written before {@code ADD COLUMNS} added the struct.
+ * Spec allows struct defaults as {@code {}} (see
https://iceberg.apache.org/spec/#default-values), but
+ * {@code UpdateSchema} add column only supports primitives today;
Review Comment:
> add column only supports primitives today;
Whaat? The how come we added struct column & the entire initialDefault
initiative here?
##########
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveSchemaUtil.java:
##########
@@ -240,4 +245,102 @@ private void assertEquals(Type expected, Type actual) {
}
}
}
+
+ @Test
+ void testBackfillStructInitialDefaults() {
Review Comment:
these tests aren't required, we can have the q test, that is enough
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java:
##########
@@ -173,7 +174,22 @@ private CloseableIterable openGeneric(FileScanTask task,
Schema readSchema) {
default -> throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(),
file.location()));
};
- return applyResidualFiltering(iterable, residual, readSchema);
+ return applyResidualFiltering(withStructInitialDefaultBackfill(iterable,
readSchema), residual, readSchema);
+ }
+
+ private CloseableIterable<T>
withStructInitialDefaultBackfill(CloseableIterable<T> iterable, Schema
readSchema) {
+ boolean needsBackfill = readSchema.columns().stream()
+ .filter(field -> field.type().isStructType())
+ .anyMatch(field ->
!HiveSchemaUtil.getStructInitialDefaults(field.type().asStructType()).isEmpty());
+ if (!needsBackfill) {
+ return iterable;
+ }
+ return CloseableIterable.transform(iterable, row -> {
Review Comment:
The `initialDefault` value will be same for all the records, missing the
field. `buildStructWithInitialDefaults` recursively parses the schema and calls
`convertToWriteType()` to build the default record from scratch for every
single missing struct on every single row. For a table with millions of rows,
this continuous object creation and recursive schema parsing will severely
degrade read performance.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java:
##########
@@ -64,6 +68,41 @@ public void readBatch(int total, ColumnVector col, TypeInfo
typeInfo) throws IOE
if (typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
fillPrimitive(col, (PrimitiveTypeInfo) typeInfo, defaultValue);
+ } else if (typeInfo.getCategory() == ObjectInspector.Category.STRUCT) {
+ fillStruct(col, (StructTypeInfo) typeInfo, defaultValue);
+ } else {
+ throw new IOException("Unsupported type category in DummyColumnReader: "
+ typeInfo.getCategory());
+ }
+ }
+
+ private void fillStruct(ColumnVector col, StructTypeInfo structTypeInfo,
Object defaultValue) throws IOException {
+ StructColumnVector structCol = (StructColumnVector) col;
+ List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+ List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
+ Map<String, Object> fieldDefaults = defaultValue instanceof Map ?
(Map<String, Object>) defaultValue : null;
Review Comment:
why not an empty map rather than `null`, else you are checking
`fieldDefaults != null ` in the entire loop
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]