jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1631640824


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader<NullWritable, ArrayWritable>  {
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+                                           final InputSplit split,
+                                           final JobConf jobConf,
+                                           final Reporter reporter) throws 
IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(jobConfCopy)
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
+    //get some config vals
+    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType spillMapType =  
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+
+    this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
jobConfCopy, tableBasePath,
+        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath),
+        tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), 
metaClient.getTableConfig(), fileSplit.getStart(),
+        fileSplit.getLength(), false, maxMemoryForMerge, 
spillableMapPath,spillMapType, bitmaskCompressEnabled);
+    this.fileGroupReader.initRecordIterators();
+    //it expects the partition columns to be at the end
+    Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        Stream.concat(tableSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+            partitionColumns.stream()).collect(Collectors.toList()));
+    this.reverseProjection = 
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (!fileGroupReader.hasNext()) {
+      return false;
+    }
+    value.set(fileGroupReader.next().get());
+    reverseProjection.apply(value);
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return nullWritable;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return arrayWritable;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return readerContext.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileGroupReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return readerContext.getProgress();
+  }
+
+  public RealtimeSplit getSplit() {
+    return (RealtimeSplit) inputSplit;
+  }
+
+  public JobConf getJobConf() {
+    return jobConfCopy;
+  }
+
+  public static List<String> getPartitionFieldNames(JobConf jobConf) {
+    String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+    return partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+        : new ArrayList<>();
+  }
+
+  private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, 
JobConf jobConf, String latestCommitTime) {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    try {
+      Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime);
+      // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+      return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, 
getPartitionFieldNames(jobConf));
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to get table schema", e);
+    }
+  }
+
+  public static String getTableBasePath(InputSplit split, JobConf jobConf) 
throws IOException {
+    if (split instanceof RealtimeSplit) {
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      return realtimeSplit.getBasePath();
+    } else {
+      Path inputPath = ((FileSplit)split).getPath();
+      FileSystem fs =  inputPath.getFileSystem(jobConf);
+      Option<Path> tablePath = TablePathUtils.getTablePath(fs, inputPath);
+      return tablePath.get().toString();
+    }
+  }
+
+  private static String getLatestCommitTime(InputSplit split, 
HoodieTableMetaClient metaClient) {
+    if (split instanceof RealtimeSplit) {
+      return ((RealtimeSplit) split).getMaxCommitTime();
+    }
+    Option<HoodieInstant> lastInstant = 
metaClient.getCommitsTimeline().lastInstant();
+    if (lastInstant.isPresent()) {
+      return lastInstant.get().getTimestamp();
+    } else {
+      return "";
+    }
+  }
+
+  /**
+   * Convert FileSplit to FileSlice, but save the locations in 'hosts' because 
that data is otherwise lost.
+   */
+  private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
+    BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+    if (split instanceof RealtimeSplit) {
+      //mor
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      boolean isLogFile = FSUtils.isLogFile(realtimeSplit.getPath());
+      String fileID;
+      String commitTime;
+      if (isLogFile) {
+        fileID = FSUtils.getFileIdFromLogPath(realtimeSplit.getPath());
+        commitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(realtimeSplit.getPath());
+      } else {
+        fileID = FSUtils.getFileId(realtimeSplit.getPath().getName());
+        commitTime = FSUtils.getCommitTime(realtimeSplit.getPath().toString());
+      }
+      HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getPartitionPath(realtimeSplit.getBasePath(),
+          realtimeSplit.getPath().getParent().toString()).toString(), fileID);
+      if (isLogFile) {
+        return new FileSlice(fileGroupId, commitTime, null, 
realtimeSplit.getDeltaLogFiles());
+      }
+      hosts.put(realtimeSplit.getPath().toString(), 
realtimeSplit.getLocations());
+      HoodieBaseFile hoodieBaseFile = new 
HoodieBaseFile(fs.getFileStatus(realtimeSplit.getPath()), bootstrapBaseFile);
+      return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, 
realtimeSplit.getDeltaLogFiles());
+    }
+    //cow
+    HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getFileId(split.getPath().getName()),
+        FSUtils.getPartitionPath(tableBasePath, 
split.getPath().getParent().toString()).toString());
+    hosts.put(split.getPath().toString(), split.getLocations());
+    return new FileSlice(fileGroupId, 
FSUtils.getCommitTime(split.getPath().toString()), new 
HoodieBaseFile(fs.getFileStatus(split.getPath()), bootstrapBaseFile), 
Collections.emptyList());
+  }
+
+  private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs) throws IOException {
+    if (split instanceof BootstrapBaseFileSplit) {
+      BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) 
split;
+      FileSplit bootstrapFileSplit = 
bootstrapBaseFileSplit.getBootstrapFileSplit();
+      hosts.put(bootstrapFileSplit.getPath().toString(), 
bootstrapFileSplit.getLocations());
+      return new BaseFile(fs.getFileStatus(bootstrapFileSplit.getPath()));
+    }
+    return null;
+  }
+
+  private static Schema createRequestedSchema(Schema tableSchema, JobConf 
jobConf) {
+    String readCols = 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (StringUtils.isNullOrEmpty(readCols)) {
+      Schema emptySchema = Schema.createRecord(tableSchema.getName(), 
tableSchema.getDoc(),
+          tableSchema.getNamespace(), tableSchema.isError());
+      emptySchema.setFields(Collections.emptyList());
+      return emptySchema;
+    }
+    //hive will handle the partition cols

Review Comment:
   You can add partition columns in hive that are not actually in the table. If 
any of the requested read columns fall into that category, we just need to 
leave space for them in the output and hive will populate the values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to