[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r889068627 ## flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java: ## @@ -227,6 +228,24 @@ public void testInvalidIgnoreParseError() { createTableSink(SCHEMA, options); } +@Test +public void testProjectionPushdown() throws IOException { Review Comment: I've added some more edge cases. Please let me know if you had something beyond that in mind - the input-output relationship seems pretty straightforward and at the moment I am lacking fantasy for more sophisticated scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r889011653 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -206,6 +235,6 @@ public int hashCode() { csvSchema.getArrayElementSeparator(), csvSchema.getQuoteChar(), csvSchema.getEscapeChar(), -csvSchema.getNullValue()); +Arrays.hashCode(csvSchema.getNullValue())); Review Comment: Because otherwise, we check for the pointers equality, not equality of the array's content (what we actually want). For a quick test: ``` public static void main(String[] args) { char[] arr1 = { 'x', 'y', 'z' }; char[] arr2 = { 'x', 'y', 'z' }; System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; arr2.hashCode(): " + arr2.hashCode()); System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) + "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2)); } ``` I could move it into a separate [hotfix] PR if you think it is more appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r889011653 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -206,6 +235,6 @@ public int hashCode() { csvSchema.getArrayElementSeparator(), csvSchema.getQuoteChar(), csvSchema.getEscapeChar(), -csvSchema.getNullValue()); +Arrays.hashCode(csvSchema.getNullValue())); Review Comment: Because otherwise, we check for the pointers equality, not equality of the array's content (what we actually want). For a quick test: ``` public static void main(String[] args) { char[] arr1 = { 'x', 'y', 'z' }; char[] arr2 = { 'x', 'y', 'z' }; System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; arr2.hashCode(): " + arr2.hashCode()); System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) + "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2)); } ``` I could move it into a hotfix PR if you think it is more appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r889011653 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -206,6 +235,6 @@ public int hashCode() { csvSchema.getArrayElementSeparator(), csvSchema.getQuoteChar(), csvSchema.getEscapeChar(), -csvSchema.getNullValue()); +Arrays.hashCode(csvSchema.getNullValue())); Review Comment: Because otherwise, we check for the pointers equality, not equality of the array's content (what we actually want). For a quick test: ``` public static void main(String[] args) { char[] arr1 = { 'x', 'y', 'z' }; char[] arr2 = { 'x', 'y', 'z' }; System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; arr2.hashCode(): " + arr2.hashCode()); System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) + "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r889011653 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -206,6 +235,6 @@ public int hashCode() { csvSchema.getArrayElementSeparator(), csvSchema.getQuoteChar(), csvSchema.getEscapeChar(), -csvSchema.getNullValue()); +Arrays.hashCode(csvSchema.getNullValue())); Review Comment: Because otherwise, we check for the pointer's equality, not equality of the array's content (what we actually want). For a quick test: ``` public static void main(String[] args) { char[] arr1 = { 'x', 'y', 'z' }; char[] arr2 = { 'x', 'y', 'z' }; System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; arr2.hashCode(): " + arr2.hashCode()); System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) + "; Arrays.hashCode(arr1): " + Arrays.hashCode(arr2)); } ``` ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -206,6 +235,6 @@ public int hashCode() { csvSchema.getArrayElementSeparator(), csvSchema.getQuoteChar(), csvSchema.getEscapeChar(), -csvSchema.getNullValue()); +Arrays.hashCode(csvSchema.getNullValue())); Review Comment: Because otherwise, we check for the pointers equality, not equality of the array's content (what we actually want). For a quick test: ``` public static void main(String[] args) { char[] arr1 = { 'x', 'y', 'z' }; char[] arr2 = { 'x', 'y', 'z' }; System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; arr2.hashCode(): " + arr2.hashCode()); System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) + "; Arrays.hashCode(arr1): " + Arrays.hashCode(arr2)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r862972424 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -65,13 +65,14 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch private final boolean ignoreParseErrors; private CsvRowDataDeserializationSchema( -RowType rowType, +RowType rowResultType, TypeInformation resultTypeInfo, CsvSchema csvSchema, boolean ignoreParseErrors) { this.resultTypeInfo = resultTypeInfo; this.runtimeConverter = -new CsvToRowDataConverters(ignoreParseErrors).createRowConverter(rowType, true); +new CsvToRowDataConverters(ignoreParseErrors) +.createRowConverter(rowResultType, true); Review Comment: I moved the converter initialization to the Builder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory
afedulov commented on code in PR #19286: URL: https://github.com/apache/flink/pull/19286#discussion_r859663940 ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema( @Internal public static class Builder { -private final RowType rowType; +private final RowType rowResultType; private final TypeInformation resultTypeInfo; private CsvSchema csvSchema; private boolean ignoreParseErrors; +/** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with optional + * parameters. + */ +public Builder( +RowType rowReadType, +RowType rowResultType, Review Comment: Good point, the JavaDoc of the fields was originally not there so I missed it. The naming comes from the original signature: ``` public Builder(RowType rowType, TypeInformation resultTypeInfo) ``` The ideas is to underline using which CsvSchema the data is going to be read from the file and what is the expected output of the result. Projection seems a bit a too specific term used in the SQL optimization world. I can see this being used outside of this scope (simple filtering for whatever other reasons or expansion of the nested fields according to a "wider" row), depending on the converter used. I've added the the missing javadoc. ## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java: ## @@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema( @Internal public static class Builder { -private final RowType rowType; +private final RowType rowResultType; private final TypeInformation resultTypeInfo; private CsvSchema csvSchema; private boolean ignoreParseErrors; +/** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with optional + * parameters. + */ +public Builder( +RowType rowReadType, +RowType rowResultType, +TypeInformation resultTypeInfo) { +Preconditions.checkNotNull(rowReadType, "RowType must not be null."); +Preconditions.checkNotNull(rowResultType, "RowType must not be null."); +Preconditions.checkNotNull(resultTypeInfo, "Result type information must not be null."); +this.rowResultType = rowResultType; +this.resultTypeInfo = resultTypeInfo; +this.csvSchema = CsvRowSchemaConverter.convert(rowReadType); Review Comment: We rely on field names for the conversion: https://github.com/apache/flink/blob/c31452bab1fb3b9a02ab616e4c5fe5e87346dfb4/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L78-L84 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org