the-other-tim-brown commented on code in PR #13549:
URL: https://github.com/apache/hudi/pull/13549#discussion_r2208597409


##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -285,6 +491,53 @@ protected Map<String, String> 
getCommonConfigs(RecordMergeMode recordMergeMode,
     return configMapping;
   }
 
+  private void 
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?> 
storageConf,
+                                                                    String 
tablePath,
+                                                                    boolean 
containsBaseFile,
+                                                                    int 
expectedLogFileNum,
+                                                                    
RecordMergeMode recordMergeMode,
+                                                                    
List<Pair<String, IndexedRecord>> expectedRecords,
+                                                                    
List<Pair<String, IndexedRecord>> expectedUnmergedRecords) throws Exception {

Review Comment:
   The unmerged records are never used. Do you want to update the tests to use 
them or do we have enough coverage of the skipMerge functionality in the other 
tests?



##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java:
##########
@@ -191,4 +191,103 @@ private static void assertWritablePrimaryType(Schema 
schema, Writable expected,
         assertEquals(expected, actual);
     }
   }
+
+  public static void assertArrayWritableMatchesSchema(Schema schema, Writable 
writable) {
+    switch (schema.getType()) {
+      case RECORD: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        assertEquals(schema.getFields().size(), ((ArrayWritable) 
writable).get().length);
+        for (Schema.Field field : schema.getFields()) {
+          assertArrayWritableMatchesSchema(field.schema(), ((ArrayWritable) 
writable).get()[field.pos()]);
+        }
+        break;
+      }
+      case ARRAY: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        for (int i = 0; i < ((ArrayWritable) writable).get().length; i++) {
+          assertArrayWritableMatchesSchema(schema.getElementType(), 
((ArrayWritable) writable).get()[i]);
+        }
+        break;
+      }
+      case MAP: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        for (int i = 0; i < ((ArrayWritable) writable).get().length; i++) {
+          Writable expectedKV = ((ArrayWritable) writable).get()[i];
+          assertInstanceOf(ArrayWritable.class, expectedKV);
+          assertEquals(2, ((ArrayWritable) expectedKV).get().length);
+          assertNotNull(((ArrayWritable) expectedKV).get()[0]);

Review Comment:
   nitpick: just pull out the casting so you can reuse it in the assertion



##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from 
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
+ */
+public class HiveTypeUtils {
+  // Conversion of Avro primitive types to Hive primitive types
+  // Avro             Hive
+  // Null
+  // boolean          boolean    check
+  // int              int        check
+  // long             bigint     check
+  // float            double     check
+  // double           double     check
+  // bytes            binary     check
+  // fixed            binary     check
+  // string           string     check
+  //                  tinyint
+  //                  smallint
+
+  // Map of Avro's primitive types to Hives (for those that are supported by 
both)
+  private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO 
= initTypeMap();
+  private static Map<Schema.Type, TypeInfo> initTypeMap() {
+    Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();

Review Comment:
   When the key is an enum, you can also use an `EnumMap`



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,209 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, INSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  private static List<Pair<String, IndexedRecord>> 
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+    return hoodieRecords.stream().map(r -> {
+      try {
+        return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).filter(Option::isPresent).map(Option::get).map(r -> 
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+  }
+
+  /**
+   * Write a base file with schema A, then write another base file with schema 
B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> secondRecords = 
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, mergedRecords);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another base file with schema B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles() 
throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Write a log file with schema A
+      List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002", 
5);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      List<Pair<String, IndexedRecord>> unmergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, unmergedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> thirdRecords = 
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> thirdIndexedRecords = 
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+      commitToTable(thirdRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      mergedRecords = CollectionUtils.combine(mergedRecords, 
thirdIndexedRecords);
+      unmergedRecords = CollectionUtils.combine(unmergedRecords, 
thirdIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, -1, RecordMergeMode.EVENT_TIME_ORDERING,

Review Comment:
   Can you add some inline comment that the `-1` is because there is a mix in 
the expected number of log files per base file so the check is disabled



##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from 
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java

Review Comment:
   There's no way to just import this functionality?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -1276,5 +1283,272 @@ public String getPartitionPath() {
     public String getRiderValue() {
       return riderValue;
     }
+
+    @Override
+    public String toString() {
+      return "RowKey: " + recordKey + ", PartitionPath: " + partitionPath
+          + ", OrderingVal: " + orderingVal + ", RiderValue: " + riderValue;
+    }
+  }
+
+  public static class SchemaEvolutionConfigs {
+    public Schema schema = AVRO_SCHEMA;
+    public boolean nestedSupport = true;
+    public boolean mapSupport = true;
+    public boolean arraySupport = true;
+    public boolean addNewFieldSupport = true;
+
+    // Int
+    public boolean intToLongSupport = true;
+    public boolean intToFloatSupport = true;
+    public boolean intToDoubleSupport = true;
+    public boolean intToStringSupport = true;
+
+    // Long
+    public boolean longToFloatSupport = true;
+    public boolean longToDoubleSupport = true;
+    public boolean longToStringSupport = true;
+
+    // Float
+    public boolean floatToDoubleSupport = true;
+    public boolean floatToStringSupport = true;
+
+    // Double
+    public boolean doubleToStringSupport = true;
+
+    // String
+    public boolean stringToBytesSupport = true;
+
+    // Bytes
+    public boolean bytesToStringSupport = true;
+  }
+
+  public void extendSchemaBeforeEvolution(SchemaEvolutionConfigs configs) {
+    List<Schema.Type> baseFields = new ArrayList<>();
+    baseFields.add(Schema.Type.INT);
+    if (configs.intToLongSupport) {

Review Comment:
   I'm trying to think of a way to combine this method with the one below. If 
we could define these cases as small case classes or enums even with a before & 
after schema type, it may be possible to clean this up a bit.



##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveSerdeAvroUtils.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utilities useful only to the AvroSerde itself.  Not mean to be used by
+ * end-users but public for interop to the ql package.
+ * Taken from 
https://github.com/apache/hive/blob/01cc7ca672320447b20bc9883e2598d9fb34fc10/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java

Review Comment:
   Why can't we just import this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to