Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-05-31 Thread via GitHub


codope commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1622387730


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader  {
+
+  public interface HiveReaderCreator {
+org.apache.hadoop.mapred.RecordReader 
getRecordReader(
+final org.apache.hadoop.mapred.InputSplit split,
+final org.apache.hadoop.mapred.JobConf job,
+final org.apache.hadoop.mapred.Reporter reporter
+) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+   final InputSplit split,
+   final JobConf jobConf,
+   final Reporter reporter) throws 
IOException {
+this.jobConfCopy = new JobConf(jobConf);
+HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+Set partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+this.inputSplit = split;
+
+FileSplit fileSplit = (FileSplit) split;
+String tableBasePath = getTableBasePath(split, jo

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-05-20 Thread via GitHub


codope commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1607649124


##
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##
@@ -123,10 +129,23 @@ public abstract ClosableIterator getFileRecordIterator(
* @param propsProperties.
* @return The ordering value.
*/
-  public abstract Comparable getOrderingValue(Option recordOption,
+  public Comparable getOrderingValue(Option recordOption,

Review Comment:
   nit: indentation



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord {
+
+  private boolean copy;
+  private boolean isDeleted;

Review Comment:
   make it final?



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://w

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-20 Thread via GitHub


jonvex commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1954508175

   > Overall looks good to me. @jonvex : What Hive versions are we 
targeting/testing ?
   
   @bvaradar I used the docker demo to test. I think that is using Hive 2. We 
would like this to replace the existing implementation so the goal is to 
support everything that works when fg reader is disabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-20 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1496057861


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheF

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-20 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1496053647


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheF

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-20 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1496049414


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {

Review Comment:
   How do I do that? HiveHoodieReaderContext is created every time 
getRecordReader is called in HoodieParquetInputFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-11 Thread via GitHub


bvaradar commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1485731251


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {

Review Comment:
In the case of Hive, can you re-confirm that HiveHoodieReaderContext 
instance scope is within a single query execution.



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collecti

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-07 Thread via GitHub


jonvex commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1932549046

   Thanks @bvaradar, we all appreciate it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-06 Thread via GitHub


bvaradar commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1931349128

   > @bvaradar Can you help the review of the hive related code?
   
   Yes @danny0405 . Will review this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-03 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1477164731


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+try {
+  Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) 
split).getPath(), job);
+  if (!tablePathOpt.isPresent()) {
+return false;
+  }
+  return tablePathOpt.get().getFileSystem(job).exists(new 
Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME));
+} catch (IOException e) {
+  return false;
+}
+  }
+
   @Override
   public RecordReader getRecordReader(final 
InputSplit split, final JobConf job,
final 
Reporter reporter) throws IOException {
+
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
+  try {
+if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) {
+  return super.getRecordReader(split, job, reporter);
+}
+if (supportAvroRead && 
HoodieColumnProjectionUtils.supportTimestamp(job)) {
+  return new HoodieFileGroupReaderRecordReader((s, j, r) -> {
+try {
+  return new ParquetRecordReaderWrapper(new 
HoodieTimestampAwareParquetInputFormat(), s, j, r);
+} catch (InterruptedException e) {
+  throw new RuntimeException(e);
+}
+  }, split, job, reporter);
+} else {
+  return new HoodieFileGroupReaderRecordReader(super::getRecordReader, 
split, job, reporter);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-02 Thread via GitHub


danny0405 commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1925026965

   @bvaradar Can you help the review of the hive related code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-02 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1476626353


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader  {
+
+  public interface HiveReaderCreator {
+org.apache.hadoop.mapred.RecordReader 
getRecordReader(
+final org.apache.hadoop.mapred.InputSplit split,
+final org.apache.hadoop.mapred.JobConf job,
+final org.apache.hadoop.mapred.Reporter reporter
+) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+   final InputSplit split,
+   final JobConf jobConf,
+   final Reporter reporter) throws 
IOException {
+this.jobConfCopy = new JobConf(jobConf);
+HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+Set partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+this.inputSplit = split;
+
+FileSplit fileSplit = (FileSplit) split;
+String tableBasePath = getTableBasePath(split, jobConfCopy);
+HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+.setConf(jobConfCopy)
+.setBasePath(tableBasePath)
+.build();
+String latestCommitTime = getLatestCommitTime(split, metaClient);
+Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+Map hosts = new HashMap<>();
+this.readerContext = new HiveHoodieRea

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-02-02 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1476613616


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+try {
+  Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) 
split).getPath(), job);
+  if (!tablePathOpt.isPresent()) {
+return false;
+  }
+  return tablePathOpt.get().getFileSystem(job).exists(new 
Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME));
+} catch (IOException e) {
+  return false;
+}
+  }
+
   @Override
   public RecordReader getRecordReader(final 
InputSplit split, final JobConf job,
final 
Reporter reporter) throws IOException {
+
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
+  try {
+if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) {
+  return super.getRecordReader(split, job, reporter);
+}
+if (supportAvroRead && 
HoodieColumnProjectionUtils.supportTimestamp(job)) {
+  return new HoodieFileGroupReaderRecordReader((s, j, r) -> {
+try {
+  return new ParquetRecordReaderWrapper(new 
HoodieTimestampAwareParquetInputFormat(), s, j, r);
+} catch (InterruptedException e) {
+  throw new RuntimeException(e);
+}
+  }, split, job, reporter);
+} else {
+  return new HoodieFileGroupReaderRecordReader(super::getRecordReader, 
split, job, reporter);

Review Comment:
   I added your suggestion. Could you please let me know if that fixes the 
issue? Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-30 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1471439782


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+  private final Map columnTypeMap = new HashMap<>();
+  private final Cache
+  objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public Map getColumnTypeMap() {
+return columnTypeMap;
+  }
+
+  public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+//From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+// hive will append virtual columns at the end of column list. we should 
remove those columns.
+// eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+Set writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+List columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+List columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+int columnNameListLen = columnNameList.size() - 1;
+for (int i = columnNameListLen; i >= 0; i--) {
+  String lastColName = columnNameList.get(columnNameList.size() - 1);
+  // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+  if (writerSchemaColNames.contains(lastColName)) {
+break;
+  }
+  columnNameList.remove(columnNameList.size() - 1);
+  columnTypeList.remove(columnTypeList.size() - 1);
+}
+
+//Use columnNameList.size() instead of columnTypeList because the type 
list is longer for some reason
+IntStream.range(0, columnNameList.size()).boxed().forEach(i -> 
columnTypeMap.put(columnNameList.get(i),
+
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ArrayWritableObjectInspector objectInspector = new 
ArrayWritableObjectInspector(rowTypeInfo);

Review Comment:
   FYI this is pretty much a copy of 
https://github.com/apache/hudi/blob/e9389ffde53fa2b28feba248b7e8f17fd565e458/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java#L111



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-30 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1471420556


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+try {
+  Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) 
split).getPath(), job);
+  if (!tablePathOpt.isPresent()) {
+return false;
+  }
+  return tablePathOpt.get().getFileSystem(job).exists(new 
Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME));
+} catch (IOException e) {
+  return false;
+}
+  }
+
   @Override
   public RecordReader getRecordReader(final 
InputSplit split, final JobConf job,
final 
Reporter reporter) throws IOException {
+
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
+  try {

Review Comment:
   
https://github.com/apache/hudi/blob/2c38ef740d3d34e9eb05b59fa147c55623b81a90/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java#L249
 I remove the partition fields from the read columns if the parquet file 
doesn't contain them. Does that help?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-30 Thread via GitHub


xiarixiaoyao commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1471121556


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+  private final Map columnTypeMap = new HashMap<>();
+  private final Cache
+  objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public Map getColumnTypeMap() {
+return columnTypeMap;
+  }
+
+  public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+//From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+// hive will append virtual columns at the end of column list. we should 
remove those columns.
+// eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+Set writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+List columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+List columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+int columnNameListLen = columnNameList.size() - 1;
+for (int i = columnNameListLen; i >= 0; i--) {
+  String lastColName = columnNameList.get(columnNameList.size() - 1);
+  // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+  if (writerSchemaColNames.contains(lastColName)) {
+break;
+  }
+  columnNameList.remove(columnNameList.size() - 1);
+  columnTypeList.remove(columnTypeList.size() - 1);
+}
+
+//Use columnNameList.size() instead of columnTypeList because the type 
list is longer for some reason
+IntStream.range(0, columnNameList.size()).boxed().forEach(i -> 
columnTypeMap.put(columnNameList.get(i),
+
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ArrayWritableObjectInspector objectInspector = new 
ArrayWritableObjectInspector(rowTypeInfo);

Review Comment:
   > There may be compatibility issues between hive2 and hive3. DATE, TIMESTAMP
   
   I think hive will handle this itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1470524689


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForT

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469933163


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class HoodieHiveRecordMerger implements HoodieRecordMerger {
+  @Override
+  public Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);

Review Comment:
   I just copied what HoodieSparkRecordMerger does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469931251


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord {
+
+  private boolean copy;
+  private boolean isDeleted;
+
+  public boolean isDeleted() {
+return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data);
+this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+this.objectInspectorCache = objectInspectorCache;
+this.schema = schema;
+this.copy = false;
+isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+   ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data, operation, Option.empty());
+this.schema = schema;
+this.copy = isCopy;
+isDeleted = data == null;
+this.objectInspector = objectInspector;
+this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key, 
HoodieOperation op) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable getOrderingValue(Schema recordSchema, Properties props) 
{
+String orderingField = ConfigUtils.getOrderingField(props);
+if (orderingField == null) {
+  return 0;
+  //throw new I

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469930915


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord {
+
+  private boolean copy;
+  private boolean isDeleted;
+
+  public boolean isDeleted() {
+return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data);
+this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+this.objectInspectorCache = objectInspectorCache;
+this.schema = schema;
+this.copy = false;
+isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+   ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data, operation, Option.empty());
+this.schema = schema;
+this.copy = isCopy;
+isDeleted = data == null;
+this.objectInspector = objectInspector;
+this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key, 
HoodieOperation op) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable getOrderingValue(Schema recordSchema, Properties props) 
{
+String orderingField = ConfigUtils.getOrderingField(props);
+if (orderingField == null) {
+  return 0;
+  //throw new I

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469919029


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForTabl

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469248006


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+  private final Map columnTypeMap = new HashMap<>();
+  private final Cache
+  objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public Map getColumnTypeMap() {
+return columnTypeMap;
+  }
+
+  public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+//From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+// hive will append virtual columns at the end of column list. we should 
remove those columns.
+// eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+Set writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+List columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+List columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+int columnNameListLen = columnNameList.size() - 1;
+for (int i = columnNameListLen; i >= 0; i--) {
+  String lastColName = columnNameList.get(columnNameList.size() - 1);
+  // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+  if (writerSchemaColNames.contains(lastColName)) {
+break;
+  }
+  columnNameList.remove(columnNameList.size() - 1);
+  columnTypeList.remove(columnTypeList.size() - 1);
+}
+
+//Use columnNameList.size() instead of columnTypeList because the type 
list is longer for some reason
+IntStream.range(0, columnNameList.size()).boxed().forEach(i -> 
columnTypeMap.put(columnNameList.get(i),
+
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ArrayWritableObjectInspector objectInspector = new 
ArrayWritableObjectInspector(rowTypeInfo);

Review Comment:
   There may be compatibility issues between hive2 and hive3. DATE, TIMESTAMP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469244004


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader  {
+
+  public interface HiveReaderCreator {
+org.apache.hadoop.mapred.RecordReader 
getRecordReader(
+final org.apache.hadoop.mapred.InputSplit split,
+final org.apache.hadoop.mapred.JobConf job,
+final org.apache.hadoop.mapred.Reporter reporter
+) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+   final InputSplit split,
+   final JobConf jobConf,
+   final Reporter reporter) throws 
IOException {
+this.jobConfCopy = new JobConf(jobConf);
+HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+Set partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+this.inputSplit = split;
+
+FileSplit fileSplit = (FileSplit) split;
+String tableBasePath = getTableBasePath(split, jobConfCopy);
+HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+.setConf(jobConfCopy)
+.setBasePath(tableBasePath)
+.build();
+String latestCommitTime = getLatestCommitTime(split, metaClient);
+Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+Map hosts = new HashMap<>();
+this.readerContext = new HiveHoodieReade

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-29 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469244004


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader  {
+
+  public interface HiveReaderCreator {
+org.apache.hadoop.mapred.RecordReader 
getRecordReader(
+final org.apache.hadoop.mapred.InputSplit split,
+final org.apache.hadoop.mapred.JobConf job,
+final org.apache.hadoop.mapred.Reporter reporter
+) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+   final InputSplit split,
+   final JobConf jobConf,
+   final Reporter reporter) throws 
IOException {
+this.jobConfCopy = new JobConf(jobConf);
+HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+Set partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+this.inputSplit = split;
+
+FileSplit fileSplit = (FileSplit) split;
+String tableBasePath = getTableBasePath(split, jobConfCopy);
+HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+.setConf(jobConfCopy)
+.setBasePath(tableBasePath)
+.build();
+String latestCommitTime = getLatestCommitTime(split, metaClient);
+Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+Map hosts = new HashMap<>();
+this.readerContext = new HiveHoodieReade

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469185474


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+try {
+  Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) 
split).getPath(), job);
+  if (!tablePathOpt.isPresent()) {
+return false;
+  }
+  return tablePathOpt.get().getFileSystem(job).exists(new 
Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME));
+} catch (IOException e) {
+  return false;
+}
+  }
+
   @Override
   public RecordReader getRecordReader(final 
InputSplit split, final JobConf job,
final 
Reporter reporter) throws IOException {
+
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
+  try {

Review Comment:
   We need to confirm that the values of "hive.io.file.readcolumn.names" and 
"hive.io.file.readcolumn.ids" in Jobconf contain partition fields, if not, 
hive3 partition query returns null. see #7355
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


xicm commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469185474


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+try {
+  Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) 
split).getPath(), job);
+  if (!tablePathOpt.isPresent()) {
+return false;
+  }
+  return tablePathOpt.get().getFileSystem(job).exists(new 
Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME));
+} catch (IOException e) {
+  return false;
+}
+  }
+
   @Override
   public RecordReader getRecordReader(final 
InputSplit split, final JobConf job,
final 
Reporter reporter) throws IOException {
+
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) {
+  try {

Review Comment:
   We need to confirm that the value of "hive.io.file.readcolumn.names" in 
Jobconf contains partition, if not, hive3 partition query returns null. see 
#7355
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469035209


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+  private final Map columnTypeMap = new HashMap<>();
+  private final Cache
+  objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public Map getColumnTypeMap() {
+return columnTypeMap;
+  }
+
+  public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+//From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+// hive will append virtual columns at the end of column list. we should 
remove those columns.
+// eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+Set writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+List columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+List columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+int columnNameListLen = columnNameList.size() - 1;
+for (int i = columnNameListLen; i >= 0; i--) {
+  String lastColName = columnNameList.get(columnNameList.size() - 1);
+  // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+  if (writerSchemaColNames.contains(lastColName)) {
+break;
+  }
+  columnNameList.remove(columnNameList.size() - 1);
+  columnTypeList.remove(columnTypeList.size() - 1);
+}
+
+//Use columnNameList.size() instead of columnTypeList because the type 
list is longer for some reason
+IntStream.range(0, columnNameList.size()).boxed().forEach(i -> 
columnTypeMap.put(columnNameList.get(i),
+
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+ArrayWritableObjectInspector objectInspector = new 
ArrayWritableObjectInspector(rowTypeInfo);

Review Comment:
   @xicm @xiarixiaoyao can you help confirm the correctness?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469033287


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class HoodieHiveRecordMerger implements HoodieRecordMerger {
+  @Override
+  public Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);

Review Comment:
   Not sure why we need to override the merge logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469032745


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord {
+
+  private boolean copy;
+  private boolean isDeleted;
+
+  public boolean isDeleted() {
+return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data);
+this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+this.objectInspectorCache = objectInspectorCache;
+this.schema = schema;
+this.copy = false;
+isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+   ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data, operation, Option.empty());
+this.schema = schema;
+this.copy = isCopy;
+isDeleted = data == null;
+this.objectInspector = objectInspector;
+this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key, 
HoodieOperation op) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable getOrderingValue(Schema recordSchema, Properties props) 
{
+String orderingField = ConfigUtils.getOrderingField(props);
+if (orderingField == null) {
+  return 0;
+  //throw ne

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469032673


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class HoodieHiveRecord extends HoodieRecord {
+
+  private boolean copy;
+  private boolean isDeleted;
+
+  public boolean isDeleted() {
+return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data);
+this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+this.objectInspectorCache = objectInspectorCache;
+this.schema = schema;
+this.copy = false;
+isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+   ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+super(key, data, operation, Option.empty());
+this.schema = schema;
+this.copy = isCopy;
+isDeleted = data == null;
+this.objectInspector = objectInspector;
+this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key, 
HoodieOperation op) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord newInstance(HoodieKey key) {
+throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable getOrderingValue(Schema recordSchema, Properties props) 
{
+String orderingField = ConfigUtils.getOrderingField(props);
+if (orderingField == null) {
+  return 0;
+  //throw ne

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469031742


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieFileGroupReaderRecordReader implements 
RecordReader  {
+
+  public interface HiveReaderCreator {
+org.apache.hadoop.mapred.RecordReader 
getRecordReader(
+final org.apache.hadoop.mapred.InputSplit split,
+final org.apache.hadoop.mapred.JobConf job,
+final org.apache.hadoop.mapred.Reporter reporter
+) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator reverseProjection;
+
+  public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator,
+   final InputSplit split,
+   final JobConf jobConf,
+   final Reporter reporter) throws 
IOException {
+this.jobConfCopy = new JobConf(jobConf);
+HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+Set partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+this.inputSplit = split;
+
+FileSplit fileSplit = (FileSplit) split;
+String tableBasePath = getTableBasePath(split, jobConfCopy);
+HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+.setConf(jobConfCopy)
+.setBasePath(tableBasePath)
+.build();
+String latestCommitTime = getLatestCommitTime(split, metaClient);
+Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+Map hosts = new HashMap<>();
+this.readerContext = new HiveHoodie

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469030424


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForT

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469029597


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForT

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-28 Thread via GitHub


danny0405 commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1469029149


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
+public class HiveHoodieReaderContext extends 
HoodieReaderContext {
+  protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map hosts;
+  protected final Map columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader firstRecordReader = null;
+
+  private final List partitionCols;
+  private final Set partitionColSet;
+
+  private final String recordKeyField;
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator 
readerCreator,
+InputSplit split,
+JobConf jobConf,
+Reporter reporter,
+Schema writerSchema,
+Map hosts,
+HoodieTableMetaClient metaClient) {
+this.readerCreator = readerCreator;
+this.split = split;
+this.jobConf = jobConf;
+this.reporter = reporter;
+this.writerSchema = writerSchema;
+this.hosts = hosts;
+this.partitionCols = 
HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream()
+.filter(n -> writerSchema.getField(n) != 
null).collect(Collectors.toList());
+this.partitionColSet = new HashSet<>(this.partitionCols);
+String tableName = metaClient.getTableConfig().getTableName();
+recordKeyField = metaClient.getTableConfig().populateMetaFields()
+? HoodieRecord.RECORD_KEY_METADATA_FIELD
+: assertSingleKey(metaClient.getTableConfig().getRecordKeyFields());
+this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForT

Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-19 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1460108785


##
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java:
##
@@ -116,6 +117,7 @@ public void setUp() {
 hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
 baseJobConf = new JobConf(hadoopConf);
 baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
+baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");

Review Comment:
   Tried again and still couldn't get them working. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-19 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1901370527

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * 15ed1ad17c8b99804d6e404342a11fab6e212935 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=22078)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-19 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1901057461

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * 8f08fa5850559a29da9aca426d74e188daf995f5 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21969)
 
   * 15ed1ad17c8b99804d6e404342a11fab6e212935 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=22078)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-19 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1901044565

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * 8f08fa5850559a29da9aca426d74e188daf995f5 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21969)
 
   * 15ed1ad17c8b99804d6e404342a11fab6e212935 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-19 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1459475819


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java:
##
@@ -101,6 +102,9 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, 
JobConf job) {
   throw new HoodieException("Could not create HoodieRealtimeRecordReader 
on path " + this.split.getPath(), e);
 }
 prepareHiveAvroSerializer();
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(jobConf)) {

Review Comment:
   We can get rid of this. I added it because when I was testing I wanted to 
make sure I was actually using the new implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-15 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1893068084

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * 8f08fa5850559a29da9aca426d74e188daf995f5 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21969)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-15 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1892965427

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * c0fbf8d4c4492880cc60f659fdba825df5395900 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21829)
 
   * 8f08fa5850559a29da9aca426d74e188daf995f5 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21969)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-15 Thread via GitHub


hudi-bot commented on PR #10422:
URL: https://github.com/apache/hudi/pull/10422#issuecomment-1892960266

   
   ## CI report:
   
   * 99517e23baa60a6a0602e9daf7f522f3c1dcfa1e UNKNOWN
   * c0fbf8d4c4492880cc60f659fdba825df5395900 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=21829)
 
   * 8f08fa5850559a29da9aca426d74e188daf995f5 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-09 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1446299229


##
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java:
##
@@ -116,6 +117,7 @@ public void setUp() {
 hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
 baseJobConf = new JobConf(hadoopConf);
 baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
+baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");

Review Comment:
   This test is directly creating the record reader instead of reading using 
the file format
   ```
   HoodieRealtimeRecordReader recordReader = new 
HoodieRealtimeRecordReader(split, jobConf, reader);
   ```
   Looking back at this, I think I might be able to update this test to use the 
new fg reader. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-09 Thread via GitHub


jonvex commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1446284069


##
packaging/bundle-validation/validate.sh:
##
@@ -93,7 +93,7 @@ test_spark_hadoop_mr_bundles () {
 # save HiveQL query results
 hiveqlresultsdir=/tmp/hadoop-mr-bundle/hiveql/trips/results
 mkdir -p $hiveqlresultsdir
-$HIVE_HOME/bin/beeline --hiveconf 
hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat \
+$HIVE_HOME/bin/beeline --verbose --hiveconf 
hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat \

Review Comment:
   no. I was trying to get it to emit more debugging info



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [HUDI-6787] Implement the HoodieFileGroupReader API for Hive [hudi]

2024-01-08 Thread via GitHub


vinothchandar commented on code in PR #10422:
URL: https://github.com/apache/hudi/pull/10422#discussion_r1445060501


##
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java:
##
@@ -116,6 +117,7 @@ public void setUp() {
 hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
 baseJobConf = new JobConf(hadoopConf);
 baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
+baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");

Review Comment:
   why "false"



##
packaging/bundle-validation/validate.sh:
##
@@ -93,7 +93,7 @@ test_spark_hadoop_mr_bundles () {
 # save HiveQL query results
 hiveqlresultsdir=/tmp/hadoop-mr-bundle/hiveql/trips/results
 mkdir -p $hiveqlresultsdir
-$HIVE_HOME/bin/beeline --hiveconf 
hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat \
+$HIVE_HOME/bin/beeline --verbose --hiveconf 
hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat \

Review Comment:
   does this need to be checked in?



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java:
##
@@ -91,9 +94,42 @@ private void initAvroInputFormat() {
 }
   }
 
+  private static boolean checkTableIsHudi(final InputSplit split, final 
JobConf job) {

Review Comment:
   rename: checkIfHudiTable



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ObjectInspectorCache {

Review Comment:
   java docs



##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##
@@ -227,23 +231,31 @@ private ClosableIterator 
makeBootstrapBaseFileIterator(HoodieBaseFile baseFil
 BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
 Pair,List> requiredFields = 
getDataAndMetaCols(requiredSchema);
 Pair,List> allFields = 
getDataAndMetaCols(dataSchema);
-
-Option> dataFileIterator = 
requiredFields.getRight().isEmpty() ? Option.empty() :
-
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0, 
dataFile.getFileLen(),
-createSchemaFromFields(allFields.getRight()), 
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
-
-Option> skeletonFileIterator = 
requiredFields.getLeft().isEmpty() ? Option.empty() :
-
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, 
baseFile.getFileLen(),
-createSchemaFromFields(allFields.getLeft()), 
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+Option,Schema>> dataFileIterator =

Review Comment:
   its cool we are able to add a new engine without much changes to this class.



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java:
##
@@ -101,6 +102,9 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, 
JobConf job) {
   throw new HoodieException("Could not create HoodieRealtimeRecordReader 
on path " + this.split.getPath(), e);
 }
 prepareHiveAvroSerializer();
+if (HoodieFileGroupReaderRecordReader.useFilegroupReader(jo