Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-04-15 Thread via GitHub


codope commented on PR #10846:
URL: https://github.com/apache/hudi/pull/10846#issuecomment-2056546988

   > @codope Have you made more progress on this fix?
   
   Not yet. We need to fix https://issues.apache.org/jira/browse/HUDI-7554 as 
well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-04-02 Thread via GitHub


yihua commented on PR #10846:
URL: https://github.com/apache/hudi/pull/10846#issuecomment-2033016182

   @codope Have you made more progress on this fix?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


danny0405 commented on code in PR #10846:
URL: https://github.com/apache/hudi/pull/10846#discussion_r1520743894


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##
@@ -38,24 +42,61 @@
 import org.apache.parquet.hadoop.ParquetRecordReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.constructHiveOrderedSchema;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 public class HoodieAvroParquetReader extends RecordReader 
{
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAvroParquetReader.class);
   private final ParquetRecordReader parquetRecordReader;
   private Schema baseSchema;
 
   public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) 
throws IOException {
-// get base schema
-ParquetMetadata fileFooter =
-ParquetFileReader.readFooter(conf, ((ParquetInputSplit) 
inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
-MessageType messageType = fileFooter.getFileMetaData().getSchema();
-baseSchema = new AvroSchemaConverter(conf).convert(messageType);
+Path filePath = ((ParquetInputSplit) inputSplit).getPath();
+Schema writerSchema;
+try {
+  HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, filePath);
+  writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+  LOG.warn("Got writer schema from table schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to get writer schema from table schema", e);
+  LOG.warn("Falling back to reading writer schema from parquet file");
+  ParquetMetadata fileFooter = ParquetFileReader.readFooter(conf, 
filePath, ParquetMetadataConverter.NO_FILTER);
+  MessageType messageType = fileFooter.getFileMetaData().getSchema();
+  writerSchema = new AvroSchemaConverter(conf).convert(messageType);
+}
+JobConf jobConf = new JobConf(conf);
+try {
+  // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+  String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+  List partitioningFields =
+  partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+  : new ArrayList<>();
+  LOG.warn("Got partitioning fields: {}", partitioningFields);
+  writerSchema = addPartitionFields(writerSchema, partitioningFields);
+  LOG.warn("Added partition fields to writer schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to add partition fields to writer schema", e);
+}
+Map schemaFieldsMap = 
getNameToFieldMap(writerSchema);
+LOG.warn("Got schema fields map: {}", schemaFieldsMap);
+baseSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
+LOG.warn("Got hive ordered schema: {}", baseSchema);

Review Comment:
   Do we have some specific test cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


codope commented on code in PR #10846:
URL: https://github.com/apache/hudi/pull/10846#discussion_r1520810830


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##
@@ -38,24 +42,61 @@
 import org.apache.parquet.hadoop.ParquetRecordReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.constructHiveOrderedSchema;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 public class HoodieAvroParquetReader extends RecordReader 
{
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAvroParquetReader.class);
   private final ParquetRecordReader parquetRecordReader;
   private Schema baseSchema;
 
   public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) 
throws IOException {
-// get base schema
-ParquetMetadata fileFooter =
-ParquetFileReader.readFooter(conf, ((ParquetInputSplit) 
inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
-MessageType messageType = fileFooter.getFileMetaData().getSchema();
-baseSchema = new AvroSchemaConverter(conf).convert(messageType);
+Path filePath = ((ParquetInputSplit) inputSplit).getPath();
+Schema writerSchema;
+try {
+  HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, filePath);
+  writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+  LOG.warn("Got writer schema from table schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to get writer schema from table schema", e);
+  LOG.warn("Falling back to reading writer schema from parquet file");
+  ParquetMetadata fileFooter = ParquetFileReader.readFooter(conf, 
filePath, ParquetMetadataConverter.NO_FILTER);
+  MessageType messageType = fileFooter.getFileMetaData().getSchema();
+  writerSchema = new AvroSchemaConverter(conf).convert(messageType);
+}
+JobConf jobConf = new JobConf(conf);
+try {
+  // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+  String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+  List partitioningFields =
+  partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+  : new ArrayList<>();
+  LOG.warn("Got partitioning fields: {}", partitioningFields);
+  writerSchema = addPartitionFields(writerSchema, partitioningFields);
+  LOG.warn("Added partition fields to writer schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to add partition fields to writer schema", e);
+}
+Map schemaFieldsMap = 
getNameToFieldMap(writerSchema);
+LOG.warn("Got schema fields map: {}", schemaFieldsMap);
+baseSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
+LOG.warn("Got hive ordered schema: {}", baseSchema);

Review Comment:
   This is a tentative fix. Need to add tests. I wanted to verify something for 
Athena.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


danny0405 commented on code in PR #10846:
URL: https://github.com/apache/hudi/pull/10846#discussion_r1520743894


##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##
@@ -38,24 +42,61 @@
 import org.apache.parquet.hadoop.ParquetRecordReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.constructHiveOrderedSchema;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 public class HoodieAvroParquetReader extends RecordReader 
{
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAvroParquetReader.class);
   private final ParquetRecordReader parquetRecordReader;
   private Schema baseSchema;
 
   public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) 
throws IOException {
-// get base schema
-ParquetMetadata fileFooter =
-ParquetFileReader.readFooter(conf, ((ParquetInputSplit) 
inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
-MessageType messageType = fileFooter.getFileMetaData().getSchema();
-baseSchema = new AvroSchemaConverter(conf).convert(messageType);
+Path filePath = ((ParquetInputSplit) inputSplit).getPath();
+Schema writerSchema;
+try {
+  HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, filePath);
+  writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+  LOG.warn("Got writer schema from table schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to get writer schema from table schema", e);
+  LOG.warn("Falling back to reading writer schema from parquet file");
+  ParquetMetadata fileFooter = ParquetFileReader.readFooter(conf, 
filePath, ParquetMetadataConverter.NO_FILTER);
+  MessageType messageType = fileFooter.getFileMetaData().getSchema();
+  writerSchema = new AvroSchemaConverter(conf).convert(messageType);
+}
+JobConf jobConf = new JobConf(conf);
+try {
+  // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+  String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+  List partitioningFields =
+  partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+  : new ArrayList<>();
+  LOG.warn("Got partitioning fields: {}", partitioningFields);
+  writerSchema = addPartitionFields(writerSchema, partitioningFields);
+  LOG.warn("Added partition fields to writer schema: {}", writerSchema);
+} catch (Exception e) {
+  LOG.error("Failed to add partition fields to writer schema", e);
+}
+Map schemaFieldsMap = 
getNameToFieldMap(writerSchema);
+LOG.warn("Got schema fields map: {}", schemaFieldsMap);
+baseSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
+LOG.warn("Got hive ordered schema: {}", baseSchema);

Review Comment:
   Does we have some specific test cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


hudi-bot commented on PR #10846:
URL: https://github.com/apache/hudi/pull/10846#issuecomment-1988331374

   
   ## CI report:
   
   * 5fd58589f4ff39fa6773e181ec6f8f23bcf49867 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=22878)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


hudi-bot commented on PR #10846:
URL: https://github.com/apache/hudi/pull/10846#issuecomment-1988146841

   
   ## CI report:
   
   * 5fd58589f4ff39fa6773e181ec6f8f23bcf49867 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=22878)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7498] Fix schema for HoodieTimestampAwareParquetInputFormat [hudi]

2024-03-11 Thread via GitHub


hudi-bot commented on PR #10846:
URL: https://github.com/apache/hudi/pull/10846#issuecomment-1988126638

   
   ## CI report:
   
   * 5fd58589f4ff39fa6773e181ec6f8f23bcf49867 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org