codope commented on code in PR #12511:
URL: https://github.com/apache/hudi/pull/12511#discussion_r1900562094
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1487,4 +1488,20 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
+
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper,
String wrapperClassName) {
+ if (avroValueWrapper == null) {
+ return null;
+ } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
+ return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
+ } else if
(TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
+ Instant instant = microsToInstant((Long)((Record)
avroValueWrapper).get(0));
Review Comment:
Do we need to cast for Date and TimestampMicros wrappers as well? I thought
the problem was only for Decimal wrapper.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1487,4 +1488,20 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
+
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper,
String wrapperClassName) {
Review Comment:
let's UT this method
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -772,6 +777,30 @@ private boolean getDefaultMetadataEnable(EngineType
engineType) {
throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
}
}
+
+ private boolean getDefaultColStatsEnable(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return true;
+ case FLINK:
+ case JAVA:
+ return false;
+ default:
+ throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
+ }
+ }
+
+ private boolean getDefaultPartitionStatsEnable(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return true;
+ case FLINK:
+ case JAVA:
+ return false;
Review Comment:
Let's add jira in the comment
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -245,13 +245,19 @@ class ColumnStats {
// with the values from this record
targetFields.forEach(field -> {
ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(),
ignored -> new ColumnStats());
-
Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(recordSchema,
field.name());
Object fieldValue;
if (record.getRecordType() == HoodieRecordType.AVRO) {
fieldValue = HoodieAvroUtils.getRecordColumnValues(record, new
String[]{field.name()}, recordSchema, false)[0];
+ if (fieldSchema.getType() == Schema.Type.INT &&
fieldSchema.getLogicalType() != null &&
fieldSchema.getLogicalType().getName().equals("date")) {
Review Comment:
can use `fieldSchema.getLogicalType() == LogicalTypes.date()` instead of
hardcoded string.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1413,18 +1426,26 @@ private static List<String>
getColumnsToIndex(HoodieTableConfig tableConfig,
*/
private static Stream<String>
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
Either<List<String>, Lazy<Option<Schema>>> tableSchema,
+
boolean freshTable,
Option<HoodieRecordType> recordType) {
List<String> columnsToIndex =
metadataConfig.getColumnsEnabledForColumnStatsIndex();
if (!columnsToIndex.isEmpty()) {
+ if (freshTable) {
+ return columnsToIndex.stream();
+ }
// filter for top level fields here.
List<String> topLevelFields = tableSchema.isLeft() ?
tableSchema.asLeft() :
(tableSchema.asRight().get().map(schema ->
schema.getFields().stream().map(field ->
field.name()).collect(toList())).orElse(new ArrayList<String>()));
return columnsToIndex.stream().filter(fieldName ->
!META_COL_SET_TO_INDEX.contains(fieldName) && (topLevelFields.isEmpty() ||
topLevelFields.contains(fieldName)));
}
- if (tableSchema.isLeft()) {
- return getFirstNFieldNames(tableSchema.asLeft().stream(),
metadataConfig.maxColumnsToIndexForColStats());
+ if (freshTable) {
+ return new ArrayList<String>().stream();
Review Comment:
`return Stream.empty()`
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -408,7 +408,8 @@ public void testRollbackCommit() throws Exception {
.withRollbackUsingMarkers(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();
Review Comment:
let's add jira id in the code comments. We will have to revisit these tests
later.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2550,9 +2578,20 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File part of current
commit metadata
- List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
+ List<List<HoodieColumnRangeMetadata<Comparable>>>
rawFileColumnMetadata = partitionedWriteStat.stream()
Review Comment:
no need to collect here. We can fuse the two transformations.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1487,4 +1488,20 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
+
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper,
String wrapperClassName) {
+ if (avroValueWrapper == null) {
+ return null;
+ } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
+ return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
Review Comment:
we're doing `.get(0)` here and below. Could this break if the schema
evolves? Maybe use the field name instead of hardcoded field index?
##########
azure-pipelines-20230430.yml:
##########
@@ -214,7 +214,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_3
displayName: UT spark-datasource Java Tests & DDL
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
Review Comment:
That's 1.5x increase! Let's track the test modules refactoring separately
but I think the main issue might be functional tests in hudi-spark-datasource.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1487,4 +1488,20 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
+
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper,
String wrapperClassName) {
Review Comment:
Why aren't we calling this method in `unwrapAvroValueWrapper(Object
avroValueWrapper)` for Date, TimestampMicros and Decimal wrapper classes?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -295,22 +298,48 @@ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS)
== null,
String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_COLUMN_STATS,
MetadataPartitionType.COLUMN_STATS.getRecordType()));
} else {
- payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
-
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
-
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
- // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
- // This causes Kryo to fail when deserializing a GenericRecord, See
HUDI-5484.
- // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
-
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
- .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
- .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
- .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
- .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
- .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
- .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
- .build();
+ try {
+ Pair<Boolean, String> isMinValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MIN_VALUE);
+ Pair<Boolean, String> isMaxValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MAX_VALUE);
+ payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
+
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
+
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
+ // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
+ // This causes Kryo to fail when deserializing a GenericRecord,
See HUDI-5484.
+ // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
+ .setMinValue(wrapValueIntoAvro(
+ isMinValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE),
+ isMinValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+
.setMaxValue(wrapValueIntoAvro(isMaxValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE),
+ isMaxValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+ .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
+ .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
+ .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
+ .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
+ .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
+ .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
+ .build();
+ } catch (Throwable e) {
+ System.out.println("adfasdf");
+ throw e;
+ }
+ }
+ }
+
+ private static Pair<Boolean, String>
getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
+ Object minValue = ((GenericRecord)
record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
+ if (minValue != null) {
+ boolean toReturn = ((GenericRecord)
minValue).getSchema().getName().equals(DateWrapper.class.getSimpleName())
+ || ((GenericRecord)
minValue).getSchema().getName().equals(TimestampMicrosWrapper.class.getSimpleName())
+ || ((GenericRecord)
minValue).getSchema().getName().equals(TimestampMicrosWrapper.class.getSimpleName());
Review Comment:
typo - same condition
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -295,22 +298,48 @@ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS)
== null,
String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_COLUMN_STATS,
MetadataPartitionType.COLUMN_STATS.getRecordType()));
} else {
- payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
-
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
-
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
- // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
- // This causes Kryo to fail when deserializing a GenericRecord, See
HUDI-5484.
- // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
-
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
- .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
- .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
- .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
- .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
- .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
- .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
- .build();
+ try {
+ Pair<Boolean, String> isMinValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MIN_VALUE);
+ Pair<Boolean, String> isMaxValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MAX_VALUE);
+ payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
+
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
+
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
+ // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
+ // This causes Kryo to fail when deserializing a GenericRecord,
See HUDI-5484.
+ // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
+ .setMinValue(wrapValueIntoAvro(
+ isMinValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE),
+ isMinValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+
.setMaxValue(wrapValueIntoAvro(isMaxValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE),
+ isMaxValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+ .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
+ .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
+ .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
+ .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
+ .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
+ .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
+ .build();
+ } catch (Throwable e) {
+ System.out.println("adfasdf");
+ throw e;
+ }
+ }
+ }
+
+ private static Pair<Boolean, String>
getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
+ Object minValue = ((GenericRecord)
record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
+ if (minValue != null) {
+ boolean toReturn = ((GenericRecord)
minValue).getSchema().getName().equals(DateWrapper.class.getSimpleName())
+ || ((GenericRecord)
minValue).getSchema().getName().equals(TimestampMicrosWrapper.class.getSimpleName())
+ || ((GenericRecord)
minValue).getSchema().getName().equals(TimestampMicrosWrapper.class.getSimpleName());
+ //|| ((GenericRecord)
minValue).getSchema().getName().equals(DecimalWrapper.class.getSimpleName());
Review Comment:
Should DecimalWrapper check be commented?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -295,22 +298,48 @@ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS)
== null,
String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_COLUMN_STATS,
MetadataPartitionType.COLUMN_STATS.getRecordType()));
} else {
- payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
-
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
-
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
- // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
- // This causes Kryo to fail when deserializing a GenericRecord, See
HUDI-5484.
- // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
-
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
- .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
- .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
- .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
- .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
- .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
- .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
- .build();
+ try {
+ Pair<Boolean, String> isMinValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MIN_VALUE);
+ Pair<Boolean, String> isMaxValueWrapperObfuscated =
getIsValueWrapperObfuscated(record, COLUMN_STATS_FIELD_MAX_VALUE);
+ payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
+
.setFileName(columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME).toString())
+
.setColumnName(columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME).toString())
+ // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
+ // This causes Kryo to fail when deserializing a GenericRecord,
See HUDI-5484.
+ // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
+ .setMinValue(wrapValueIntoAvro(
+ isMinValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE),
+ isMinValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+
.setMaxValue(wrapValueIntoAvro(isMaxValueWrapperObfuscated.getKey() ?
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE),
+ isMaxValueWrapperObfuscated.getValue())
+ :
unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+ .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
+ .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
+ .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
+ .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
+ .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
+ .setIsTightBound((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
+ .build();
+ } catch (Throwable e) {
+ System.out.println("adfasdf");
+ throw e;
+ }
+ }
+ }
+
+ private static Pair<Boolean, String>
getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
+ Object minValue = ((GenericRecord)
record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
Review Comment:
let's rename the variable to `statsValue` because it could be min or max.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -245,13 +245,19 @@ class ColumnStats {
// with the values from this record
targetFields.forEach(field -> {
ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(),
ignored -> new ColumnStats());
-
Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(recordSchema,
field.name());
Object fieldValue;
if (record.getRecordType() == HoodieRecordType.AVRO) {
fieldValue = HoodieAvroUtils.getRecordColumnValues(record, new
String[]{field.name()}, recordSchema, false)[0];
+ if (fieldSchema.getType() == Schema.Type.INT &&
fieldSchema.getLogicalType() != null &&
fieldSchema.getLogicalType().getName().equals("date")) {
+ fieldValue = java.sql.Date.valueOf(fieldValue.toString());
+ }
+
} else if (record.getRecordType() == HoodieRecordType.SPARK) {
fieldValue = record.getColumnValues(recordSchema, new
String[]{field.name()}, false)[0];
+ if (fieldSchema.getType() == Schema.Type.INT &&
fieldSchema.getLogicalType() != null &&
fieldSchema.getLogicalType().getName().equals("date")) {
Review Comment:
same here - can use fieldSchema.getLogicalType() == LogicalTypes.date()
instead of hardcoded string.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -772,6 +777,30 @@ private boolean getDefaultMetadataEnable(EngineType
engineType) {
throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
}
}
+
+ private boolean getDefaultColStatsEnable(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return true;
+ case FLINK:
+ case JAVA:
+ return false;
Review Comment:
Let's add jira in the comment
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1413,18 +1426,26 @@ private static List<String>
getColumnsToIndex(HoodieTableConfig tableConfig,
*/
private static Stream<String>
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
Either<List<String>, Lazy<Option<Schema>>> tableSchema,
+
boolean freshTable,
Review Comment:
let's update javadoc. Also, i think a better name would be
`isTableInitializing`
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java:
##########
@@ -101,9 +101,9 @@ public static Stream<Arguments> orderingValueParams() {
{new Double[] {Double.MIN_VALUE, 0.125, 809.25, Double.MAX_VALUE}},
{new String[] {"val1", "val2", "val3", null}},
{new Timestamp[] {new Timestamp(1690766971000L), new
Timestamp(1672536571000L)}},
- {new LocalDate[] {LocalDate.of(2023, 1, 1), LocalDate.of(1980, 7,
1)}},
- {new BigDecimal[] {new BigDecimal("12345678901234.2948"),
- new BigDecimal("23456789012345.4856")}}
+ {new LocalDate[] {LocalDate.of(2023, 1, 1), LocalDate.of(1980, 7,
1)}}
+ //{new BigDecimal[] {new BigDecimal("12345678901234.2948"),
Review Comment:
this should work right? or is the scale/precision out of range? let;s fix
this test.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java:
##########
@@ -614,6 +615,8 @@ public void
testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode writeCon
public void testReuseEmbeddedServer() throws IOException {
conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
conf.setString("hoodie.metadata.enable","true");
+
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
"false");
Review Comment:
sounds good, but please add JIRA in the comment
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -117,10 +117,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
shouldReadInMemory: Boolean,
prunedPartitions: Option[Set[String]] = None,
prunedFileNamesOpt: Option[Set[String]] = None)(block:
DataFrame => T): T = {
- cachedColumnStatsIndexViews.get(targetColumns) match {
+ /*cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
- case None =>
+ case None =>*/
Review Comment:
why is this commented?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2550,9 +2578,20 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File part of current
commit metadata
- List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
+ List<List<HoodieColumnRangeMetadata<Comparable>>>
rawFileColumnMetadata = partitionedWriteStat.stream()
.map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, validColumnsToIndex, tableSchema))
.collect(Collectors.toList());
+ // convert to payload and reconstruct the stats to maintain parity for
certain data types where avro wrapping and unwrapping could change the types.
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
rawFileColumnMetadata.stream()
+ .map(new Function<List<HoodieColumnRangeMetadata<Comparable>>,
List<HoodieColumnRangeMetadata<Comparable>>>() {
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
apply(List<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadata) {
+ return
HoodieMetadataPayload.createColumnStatsRecords(partitionName,
hoodieColumnRangeMetadata, false).map(record -> {
+ return
HoodieColumnRangeMetadata.fromColumnStats((((HoodieMetadataPayload)
record.getData()).getColumnStatMetadata().get()));
+ }).collect(toList());
+ }
+ }).collect(toList());
Review Comment:
so we are converting from a list of HoodieColumnRangeMetadata to
HoodieMetadataPayload and then back to HoodieColumnRangeMetadata. Sounds like a
lot of work. Can we avoid repeated transformations? My suggestions is to handle
parity in `HoodieTableMetadataUtil.collectColumnRangeMetadata`, which is used
even in the append handle.
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java:
##########
@@ -473,6 +474,7 @@ public void testMetadataTableServices() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(3) // after 3 delta
commits for regular writer operations, compaction should kick in.
+ .withMetadataIndexColumnStats(false)
Review Comment:
Are we tracking colstats support for other engines? Please also add jira id
in the code comment.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java:
##########
@@ -363,7 +364,9 @@ private TableOptions defaultTableOptions(String tablePath) {
FlinkOptions.COMPACTION_TASKS.key(), 1,
FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false,
HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false,
- HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true);
+ HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true,
Review Comment:
cool, please also add this jira in the comment.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java:
##########
@@ -267,7 +267,7 @@ private void testBootstrapCommon(boolean partitioned,
boolean deltaCommit, Effec
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass)
.withBootstrapModeForRegexMatch(modeForRegexMatch).build())
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).withMetadataIndexColumnStats(false).build())
Review Comment:
please add jira in the code comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]