danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469029597


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map<String,String[]> hosts;
+  protected final Map<String, TypeInfo> columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
+
+  private final List<String> partitionCols;
+  private final Set<String> partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+                                    InputSplit split,
+                                    JobConf jobConf,
+                                    Reporter reporter,
+                                    Schema writerSchema,
+                                    Map<String,String[]> hosts,
+                                    HoodieTableMetaClient metaClient) {
+    this.readerCreator = readerCreator;
+    this.split = split;
+    this.jobConf = jobConf;
+    this.reporter = reporter;
+    this.writerSchema = writerSchema;
+    this.hosts = hosts;
+    this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+            .filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+    this.partitionColSet = new HashSet<>(this.partitionCols);
+    String tableName = metaClient.getTableConfig().getTableName();
+    recordKeyField = metaClient.getTableConfig().populateMetaFields()
+        ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+        : assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+    this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf);
+    this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
+  }
+
+  private static String assertSingleKey(Option<String[]> recordKeyFieldsOpt) {
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "no record 
key field");
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "more 
than 1 record key, and not meta fields");
+    return recordKeyFieldsOpt.get()[0];
+  }
+
+  @Override
+  public FileSystem getFs(String path, Configuration conf) {
+    return FSUtils.getFs(path, conf);
+  }
+
+  @Override
+  public ClosableIterator<ArrayWritable> getFileRecordIterator(Path filePath, 
long start, long length, Schema dataSchema, Schema requiredSchema, 
Configuration conf) throws IOException {
+    JobConf jobConfCopy = new JobConf(jobConf);
+    //move the partition cols to the end, because in some cases it has issues 
if we don't do that
+    Schema modifiedDataSchema = 
HoodieAvroUtils.generateProjectionSchema(dataSchema, 
Stream.concat(dataSchema.getFields().stream()
+            .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> 
!partitionColSet.contains(n)),
+        partitionCols.stream().filter(c -> dataSchema.getField(c) != 
null)).collect(Collectors.toList()));
+    setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
+    InputSplit inputSplit = new FileSplit(filePath, start, length, 
hosts.get(filePath.toString()));
+    RecordReader<NullWritable, ArrayWritable> recordReader = 
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
+    if (firstRecordReader == null) {
+      firstRecordReader = recordReader;
+    }
+    ClosableIterator<ArrayWritable> recordIterator = new 
RecordReaderValueIterator<>(recordReader);
+    if (modifiedDataSchema.equals(requiredSchema)) {
+      return  recordIterator;
+    }
+    //The record reader puts the required columns in the positions of the data 
schema and nulls the rest of the columns
+    return new CloseableMappingIterator<>(recordIterator, 
projectRecord(modifiedDataSchema, requiredSchema));
+  }
+
+  private void setSchemas(JobConf jobConf, Schema dataSchema, Schema 
requiredSchema) {
+    List<String> dataColumnNameList = dataSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList());
+    List<TypeInfo> dataColumnTypeList = 
dataColumnNameList.stream().map(fieldName -> {
+      TypeInfo type = columnTypeMap.get(fieldName);
+      if (type == null) {
+        throw new IllegalArgumentException("Field: " + fieldName + ", does not 
have a defined type");
+      }
+      return type;
+    }).collect(Collectors.toList());
+    jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
+    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
+    //don't replace `f -> f.name()` with lambda reference
+    String readColNames = requiredSchema.getFields().stream().map(f -> 
f.name()).collect(Collectors.joining(","));
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
readColNames);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
requiredSchema.getFields()
+        .stream().map(f -> 
String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(",")));
+  }
+
+  @Override
+  public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) {
+    //should be support timestamp?
+    return (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
avroRecord.getSchema(), false);
+  }
+
+  @Override
+  public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+    switch (mergerStrategy) {
+      case DEFAULT_MERGER_STRATEGY_UUID:
+        return new HoodieHiveRecordMerger();
+      default:
+        throw new HoodieException("The merger strategy UUID is not supported: 
" + mergerStrategy);

Review Comment:
   do we want to report a UUID in the error msg or the strategy name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to