Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra merged PR #1815: URL: https://github.com/apache/datafusion-comet/pull/1815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on PR #1815: URL: https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2964554260 Removed commented out code that is no longer needed. Thanks for the second review @andygrove. Will merge after ci completes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2141233325
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -424,6 +425,187 @@ public void init() throws Throwable {
isInitialized = true;
}
+ private ParquetColumn getParquetColumn(MessageType schema, StructType
sparkSchema) {
+// We use a different config from the config that is passed in.
+// This follows the setting used in Spark's
SpecificParquetRecordReaderBase
+Configuration config = new Configuration();
+config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(),
false);
+config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+ParquetToSparkSchemaConverter converter = new
ParquetToSparkSchemaConverter(config);
+return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+ }
+
+ private Map> getIdToParquetFieldMap(GroupType type) {
+return type.getFields().stream()
+.filter(f -> f.getId() != null)
+.collect(Collectors.groupingBy(f -> f.getId().intValue()));
+ }
+
+ private Map> getCaseSensitiveParquetFieldMap(GroupType
schema) {
+return schema.getFields().stream().collect(Collectors.toMap(Type::getName,
Arrays::asList));
+ }
+
+ private Map> getCaseInsensitiveParquetFieldMap(GroupType
schema) {
+return schema.getFields().stream()
+.collect(Collectors.groupingBy(f ->
f.getName().toLowerCase(Locale.ROOT)));
+ }
+
+ private Type getMatchingParquetFieldById(
+ StructField f,
+ Map> idToParquetFieldMap,
+ Map> nameToParquetFieldMap,
+ boolean isCaseSensitive) {
+List matched = null;
+int fieldId = 0;
+if (ParquetUtils.hasFieldId(f)) {
+ fieldId = ParquetUtils.getFieldId(f);
+ matched = idToParquetFieldMap.get(fieldId);
+} else {
+ String fieldName = isCaseSensitive ? f.name() :
f.name().toLowerCase(Locale.ROOT);
+ matched = nameToParquetFieldMap.get(fieldName);
+}
+
+if (matched == null || matched.isEmpty()) {
+ return null;
+}
+if (matched.size() > 1) {
+ // Need to fail if there is ambiguity, i.e. more than one field is
matched
+ String parquetTypesString =
+ matched.stream().map(Type::getName).collect(Collectors.joining("[",
", ", "]"));
+ throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+ fieldId, parquetTypesString);
+} else {
+ return matched.get(0);
+}
+ }
+
+ // Derived from CometParquetReadSupport.matchFieldId
+ private String getMatchingNameById(
+ StructField f,
+ Map> idToParquetFieldMap,
+ Map> nameToParquetFieldMap /*, Map
nameMap*/,
Review Comment:
is this commented out code still needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2141233094
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -424,6 +425,187 @@ public void init() throws Throwable {
isInitialized = true;
}
+ private ParquetColumn getParquetColumn(MessageType schema, StructType
sparkSchema) {
+// We use a different config from the config that is passed in.
+// This follows the setting used in Spark's
SpecificParquetRecordReaderBase
+Configuration config = new Configuration();
+config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(),
false);
+config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+ParquetToSparkSchemaConverter converter = new
ParquetToSparkSchemaConverter(config);
+return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+ }
+
+ private Map> getIdToParquetFieldMap(GroupType type) {
+return type.getFields().stream()
+.filter(f -> f.getId() != null)
+.collect(Collectors.groupingBy(f -> f.getId().intValue()));
+ }
+
+ private Map> getCaseSensitiveParquetFieldMap(GroupType
schema) {
+return schema.getFields().stream().collect(Collectors.toMap(Type::getName,
Arrays::asList));
+ }
+
+ private Map> getCaseInsensitiveParquetFieldMap(GroupType
schema) {
+return schema.getFields().stream()
+.collect(Collectors.groupingBy(f ->
f.getName().toLowerCase(Locale.ROOT)));
+ }
+
+ private Type getMatchingParquetFieldById(
+ StructField f,
+ Map> idToParquetFieldMap,
+ Map> nameToParquetFieldMap,
+ boolean isCaseSensitive) {
+List matched = null;
+int fieldId = 0;
+if (ParquetUtils.hasFieldId(f)) {
+ fieldId = ParquetUtils.getFieldId(f);
+ matched = idToParquetFieldMap.get(fieldId);
+} else {
+ String fieldName = isCaseSensitive ? f.name() :
f.name().toLowerCase(Locale.ROOT);
+ matched = nameToParquetFieldMap.get(fieldName);
+}
+
+if (matched == null || matched.isEmpty()) {
+ return null;
+}
+if (matched.size() > 1) {
+ // Need to fail if there is ambiguity, i.e. more than one field is
matched
+ String parquetTypesString =
+ matched.stream().map(Type::getName).collect(Collectors.joining("[",
", ", "]"));
+ throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+ fieldId, parquetTypesString);
+} else {
+ return matched.get(0);
+}
+ }
+
+ // Derived from CometParquetReadSupport.matchFieldId
+ private String getMatchingNameById(
+ StructField f,
+ Map> idToParquetFieldMap,
+ Map> nameToParquetFieldMap /*, Map
nameMap*/,
+ boolean isCaseSensitive) {
+Type matched =
+getMatchingParquetFieldById(f, idToParquetFieldMap,
nameToParquetFieldMap, isCaseSensitive);
+
+// When there is no ID match, we use a fake name to avoid a name match by
accident
+// We need this name to be unique as well, otherwise there will be type
conflicts
+if (matched == null /*|| matched.isEmpty()*/) {
Review Comment:
is this commented out code still needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on PR #1815: URL: https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2922733351 Changing this to draft while I look into the CI failures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114910872
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[]
columnReaders) {
public void init() throws Throwable {
conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");
Review Comment:
Rethinking this along with investigating the ci failures
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on PR #1815: URL: https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2920712348 @mbutrovich need your help here. CI is failing due to tests involving default values. Should these tests be enabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114833029
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[]
columnReaders) {
public void init() throws Throwable {
conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");
Review Comment:
https://github.com/apache/datafusion-comet/issues/1816
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
parthchandra commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114830345
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[]
columnReaders) {
public void init() throws Throwable {
conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");
Review Comment:
Let me log a follow up PR to investigate this further. As things stand,
without these configs, many Spark tests fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2114468324
##
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##
@@ -236,7 +236,7 @@ public NativeBatchReader(AbstractColumnReader[]
columnReaders) {
public void init() throws Throwable {
conf.set("spark.sql.parquet.binaryAsString", "false");
-conf.set("spark.sql.parquet.int96AsTimestamp", "false");
+conf.set("spark.sql.parquet.int96AsTimestamp", "true");
Review Comment:
I still have questions about these configs. In the unikely event that a user
explicity sets `spark.sql.parquet.int96AsTimestamp=false`, shouldn't we respect
that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] fix: map parquet field_id correctly (native_iceberg_compat) [datafusion-comet]
codecov-commenter commented on PR #1815: URL: https://github.com/apache/datafusion-comet/pull/1815#issuecomment-2920187518 ## [Codecov](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `0%` with `2 lines` in your changes missing coverage. Please review. > Project coverage is 32.09%. Comparing base [(`f09f8af`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/f09f8af64c6599255e116a376f4f008f2fd63b43?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`b455267`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/b455267e6f4e3818a111e66b2768d61101a8587c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 227 commits behind head on main. | [Files with missing lines](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...va/org/apache/comet/parquet/NativeBatchReader.java](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fcomet%2Fparquet%2FNativeBatchReader.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jb21ldC9wYXJxdWV0L05hdGl2ZUJhdGNoUmVhZGVyLmphdmE=) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## main#1815 +/- ## = - Coverage 56.12% 32.09% -24.03% + Complexity 976 650 -326 = Files 119 129 +10 Lines 1174312617 +874 Branches 2251 2348 +97 = - Hits 6591 4050 -2541 - Misses 4012 7776 +3764 + Partials 1140 791 -349 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/datafusion-comet/pull/1815?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :rocket: New features to boost your workflow: - :snowflake: [Test Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, report on failures, and find test suite problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
