Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-06-12 Thread via GitHub


parthchandra merged PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-06-11 Thread via GitHub


parthchandra commented on PR #1815:
URL: 
https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2964554260

   Removed commented out code that is no longer needed. Thanks for the second 
review @andygrove. Will merge after ci completes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-06-11 Thread via GitHub


andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2141233325


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -424,6 +425,187 @@ public void init() throws Throwable {
 isInitialized = true;
   }
 
+  private ParquetColumn getParquetColumn(MessageType schema, StructType 
sparkSchema) {
+// We use a different config from the config that is passed in.
+// This follows the setting  used in Spark's 
SpecificParquetRecordReaderBase
+Configuration config = new Configuration();
+config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), 
false);
+config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(config);
+return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+  }
+
+  private Map> getIdToParquetFieldMap(GroupType type) {
+return type.getFields().stream()
+.filter(f -> f.getId() != null)
+.collect(Collectors.groupingBy(f -> f.getId().intValue()));
+  }
+
+  private Map> getCaseSensitiveParquetFieldMap(GroupType 
schema) {
+return schema.getFields().stream().collect(Collectors.toMap(Type::getName, 
Arrays::asList));
+  }
+
+  private Map> getCaseInsensitiveParquetFieldMap(GroupType 
schema) {
+return schema.getFields().stream()
+.collect(Collectors.groupingBy(f -> 
f.getName().toLowerCase(Locale.ROOT)));
+  }
+
+  private Type getMatchingParquetFieldById(
+  StructField f,
+  Map> idToParquetFieldMap,
+  Map> nameToParquetFieldMap,
+  boolean isCaseSensitive) {
+List matched = null;
+int fieldId = 0;
+if (ParquetUtils.hasFieldId(f)) {
+  fieldId = ParquetUtils.getFieldId(f);
+  matched = idToParquetFieldMap.get(fieldId);
+} else {
+  String fieldName = isCaseSensitive ? f.name() : 
f.name().toLowerCase(Locale.ROOT);
+  matched = nameToParquetFieldMap.get(fieldName);
+}
+
+if (matched == null || matched.isEmpty()) {
+  return null;
+}
+if (matched.size() > 1) {
+  // Need to fail if there is ambiguity, i.e. more than one field is 
matched
+  String parquetTypesString =
+  matched.stream().map(Type::getName).collect(Collectors.joining("[", 
", ", "]"));
+  throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+  fieldId, parquetTypesString);
+} else {
+  return matched.get(0);
+}
+  }
+
+  // Derived from CometParquetReadSupport.matchFieldId
+  private String getMatchingNameById(
+  StructField f,
+  Map> idToParquetFieldMap,
+  Map> nameToParquetFieldMap /*, Map 
nameMap*/,

Review Comment:
   is this commented out code still needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-06-11 Thread via GitHub


andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2141233094


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -424,6 +425,187 @@ public void init() throws Throwable {
 isInitialized = true;
   }
 
+  private ParquetColumn getParquetColumn(MessageType schema, StructType 
sparkSchema) {
+// We use a different config from the config that is passed in.
+// This follows the setting  used in Spark's 
SpecificParquetRecordReaderBase
+Configuration config = new Configuration();
+config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), 
false);
+config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(config);
+return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+  }
+
+  private Map> getIdToParquetFieldMap(GroupType type) {
+return type.getFields().stream()
+.filter(f -> f.getId() != null)
+.collect(Collectors.groupingBy(f -> f.getId().intValue()));
+  }
+
+  private Map> getCaseSensitiveParquetFieldMap(GroupType 
schema) {
+return schema.getFields().stream().collect(Collectors.toMap(Type::getName, 
Arrays::asList));
+  }
+
+  private Map> getCaseInsensitiveParquetFieldMap(GroupType 
schema) {
+return schema.getFields().stream()
+.collect(Collectors.groupingBy(f -> 
f.getName().toLowerCase(Locale.ROOT)));
+  }
+
+  private Type getMatchingParquetFieldById(
+  StructField f,
+  Map> idToParquetFieldMap,
+  Map> nameToParquetFieldMap,
+  boolean isCaseSensitive) {
+List matched = null;
+int fieldId = 0;
+if (ParquetUtils.hasFieldId(f)) {
+  fieldId = ParquetUtils.getFieldId(f);
+  matched = idToParquetFieldMap.get(fieldId);
+} else {
+  String fieldName = isCaseSensitive ? f.name() : 
f.name().toLowerCase(Locale.ROOT);
+  matched = nameToParquetFieldMap.get(fieldName);
+}
+
+if (matched == null || matched.isEmpty()) {
+  return null;
+}
+if (matched.size() > 1) {
+  // Need to fail if there is ambiguity, i.e. more than one field is 
matched
+  String parquetTypesString =
+  matched.stream().map(Type::getName).collect(Collectors.joining("[", 
", ", "]"));
+  throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+  fieldId, parquetTypesString);
+} else {
+  return matched.get(0);
+}
+  }
+
+  // Derived from CometParquetReadSupport.matchFieldId
+  private String getMatchingNameById(
+  StructField f,
+  Map> idToParquetFieldMap,
+  Map> nameToParquetFieldMap /*, Map 
nameMap*/,
+  boolean isCaseSensitive) {
+Type matched =
+getMatchingParquetFieldById(f, idToParquetFieldMap, 
nameToParquetFieldMap, isCaseSensitive);
+
+// When there is no ID match, we use a fake name to avoid a name match by 
accident
+// We need this name to be unique as well, otherwise there will be type 
conflicts
+if (matched == null /*|| matched.isEmpty()*/) {

Review Comment:
   is this commented out code still needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-30 Thread via GitHub


parthchandra commented on PR #1815:
URL: 
https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2922733351

   Changing this to draft while I look into the CI failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114910872


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
   public void init() throws Throwable {
 
 conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");

Review Comment:
   Rethinking this along with investigating the ci failures



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


parthchandra commented on PR #1815:
URL: 
https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2920712348

   @mbutrovich need your help here. CI is failing due to tests involving 
default values. Should these tests be enabled? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114833029


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
   public void init() throws Throwable {
 
 conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");

Review Comment:
   https://github.com/apache/datafusion-comet/issues/1816



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114830345


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
   public void init() throws Throwable {
 
 conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");

Review Comment:
   Let me log a follow up PR to investigate this further. As things stand, 
without these configs, many Spark tests fail. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114468324


##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
   public void init() throws Throwable {
 
 conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");

Review Comment:
   I still have questions about these configs. In the unikely event that a user 
explicity sets `spark.sql.parquet.int96AsTimestamp=false`, shouldn't we respect 
that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]

2025-05-29 Thread via GitHub


codecov-commenter commented on PR #1815:
URL: 
https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2920187518

   ## 
[Codecov](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `0%` with `2 lines` in your changes missing 
coverage. Please review.
   > Project coverage is 32.09%. Comparing base 
[(`f09f8af`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/f09f8af64c6599255e116a376f4f008f2fd63b43?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`b455267`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/b455267e6f4e3818a111e66b2768d61101a8587c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 227 commits behind head on main.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...va/org/apache/comet/parquet/NativeBatchReader.java](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fcomet%2Fparquet%2FNativeBatchReader.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jb21ldC9wYXJxdWV0L05hdGl2ZUJhdGNoUmVhZGVyLmphdmE=)
 | 0.00% | [2 Missing :warning: 
](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@  Coverage Diff  @@
   ##   main#1815   +/-   ##
   =
   - Coverage 56.12%   32.09%   -24.03% 
   + Complexity  976  650  -326 
   =
 Files   119  129   +10 
 Lines 1174312617  +874 
 Branches   2251 2348   +97 
   =
   - Hits   6591 4050 -2541 
   - Misses 4012 7776 +3764 
   + Partials   1140  791  -349 
   ```
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
:rocket: New features to boost your workflow: 
   
   - :snowflake: [Test 
Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, 
report on failures, and find test suite problems.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]