the-other-tim-brown commented on code in PR #13549:
URL: https://github.com/apache/hudi/pull/13549#discussion_r2208597409
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -285,6 +491,53 @@ protected Map<String, String>
getCommonConfigs(RecordMergeMode recordMergeMode,
return configMapping;
}
+ private void
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?>
storageConf,
+ String
tablePath,
+ boolean
containsBaseFile,
+ int
expectedLogFileNum,
+
RecordMergeMode recordMergeMode,
+
List<Pair<String, IndexedRecord>> expectedRecords,
+
List<Pair<String, IndexedRecord>> expectedUnmergedRecords) throws Exception {
Review Comment:
The unmerged records are never used. Do you want to update the tests to use
them or do we have enough coverage of the skipMerge functionality in the other
tests?
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java:
##########
@@ -191,4 +191,103 @@ private static void assertWritablePrimaryType(Schema
schema, Writable expected,
assertEquals(expected, actual);
}
}
+
+ public static void assertArrayWritableMatchesSchema(Schema schema, Writable
writable) {
+ switch (schema.getType()) {
+ case RECORD: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ assertEquals(schema.getFields().size(), ((ArrayWritable)
writable).get().length);
+ for (Schema.Field field : schema.getFields()) {
+ assertArrayWritableMatchesSchema(field.schema(), ((ArrayWritable)
writable).get()[field.pos()]);
+ }
+ break;
+ }
+ case ARRAY: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ for (int i = 0; i < ((ArrayWritable) writable).get().length; i++) {
+ assertArrayWritableMatchesSchema(schema.getElementType(),
((ArrayWritable) writable).get()[i]);
+ }
+ break;
+ }
+ case MAP: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ for (int i = 0; i < ((ArrayWritable) writable).get().length; i++) {
+ Writable expectedKV = ((ArrayWritable) writable).get()[i];
+ assertInstanceOf(ArrayWritable.class, expectedKV);
+ assertEquals(2, ((ArrayWritable) expectedKV).get().length);
+ assertNotNull(((ArrayWritable) expectedKV).get()[0]);
Review Comment:
nitpick: just pull out the casting so you can reuse it in the assertion
##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
+ */
+public class HiveTypeUtils {
+ // Conversion of Avro primitive types to Hive primitive types
+ // Avro Hive
+ // Null
+ // boolean boolean check
+ // int int check
+ // long bigint check
+ // float double check
+ // double double check
+ // bytes binary check
+ // fixed binary check
+ // string string check
+ // tinyint
+ // smallint
+
+ // Map of Avro's primitive types to Hives (for those that are supported by
both)
+ private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO
= initTypeMap();
+ private static Map<Schema.Type, TypeInfo> initTypeMap() {
+ Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
Review Comment:
When the key is an enum, you can also use an `EnumMap`
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,209 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another base file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles()
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Write a log file with schema A
+ List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002",
5);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ List<Pair<String, IndexedRecord>> unmergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, unmergedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> thirdRecords =
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+ commitToTable(thirdRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ mergedRecords = CollectionUtils.combine(mergedRecords,
thirdIndexedRecords);
+ unmergedRecords = CollectionUtils.combine(unmergedRecords,
thirdIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
Review Comment:
Can you add some inline comment that the `-1` is because there is a mix in
the expected number of log files per base file so the check is disabled
##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
Review Comment:
There's no way to just import this functionality?
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -1276,5 +1283,272 @@ public String getPartitionPath() {
public String getRiderValue() {
return riderValue;
}
+
+ @Override
+ public String toString() {
+ return "RowKey: " + recordKey + ", PartitionPath: " + partitionPath
+ + ", OrderingVal: " + orderingVal + ", RiderValue: " + riderValue;
+ }
+ }
+
+ public static class SchemaEvolutionConfigs {
+ public Schema schema = AVRO_SCHEMA;
+ public boolean nestedSupport = true;
+ public boolean mapSupport = true;
+ public boolean arraySupport = true;
+ public boolean addNewFieldSupport = true;
+
+ // Int
+ public boolean intToLongSupport = true;
+ public boolean intToFloatSupport = true;
+ public boolean intToDoubleSupport = true;
+ public boolean intToStringSupport = true;
+
+ // Long
+ public boolean longToFloatSupport = true;
+ public boolean longToDoubleSupport = true;
+ public boolean longToStringSupport = true;
+
+ // Float
+ public boolean floatToDoubleSupport = true;
+ public boolean floatToStringSupport = true;
+
+ // Double
+ public boolean doubleToStringSupport = true;
+
+ // String
+ public boolean stringToBytesSupport = true;
+
+ // Bytes
+ public boolean bytesToStringSupport = true;
+ }
+
+ public void extendSchemaBeforeEvolution(SchemaEvolutionConfigs configs) {
+ List<Schema.Type> baseFields = new ArrayList<>();
+ baseFields.add(Schema.Type.INT);
+ if (configs.intToLongSupport) {
Review Comment:
I'm trying to think of a way to combine this method with the one below. If
we could define these cases as small case classes or enums even with a before &
after schema type, it may be possible to clean this up a bit.
##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveSerdeAvroUtils.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utilities useful only to the AvroSerde itself. Not mean to be used by
+ * end-users but public for interop to the ql package.
+ * Taken from
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
Review Comment:
Why can't we just import this class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]