[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-06-03 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r889068627


##
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java:
##
@@ -227,6 +228,24 @@ public void testInvalidIgnoreParseError() {
 createTableSink(SCHEMA, options);
 }
 
+@Test
+public void testProjectionPushdown() throws IOException {

Review Comment:
   I've added some more edge cases. Please let me know if you had something 
beyond that in mind - the input-output relationship seems pretty 
straightforward and at the moment I am lacking fantasy for more sophisticated 
scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-06-03 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r889011653


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -206,6 +235,6 @@ public int hashCode() {
 csvSchema.getArrayElementSeparator(),
 csvSchema.getQuoteChar(),
 csvSchema.getEscapeChar(),
-csvSchema.getNullValue());
+Arrays.hashCode(csvSchema.getNullValue()));

Review Comment:
   Because otherwise, we check for the pointers equality, not equality of the 
array's content (what we actually want). For a quick test:
   ```
   public static void main(String[] args) {
   char[] arr1 = { 'x', 'y', 'z' };
   char[] arr2 = { 'x', 'y', 'z' };
   System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; 
arr2.hashCode(): " + arr2.hashCode());
   System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) 
+ "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2));
   }
   ```
   I could move it into a separate [hotfix] PR if you think it is more 
appropriate.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-06-03 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r889011653


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -206,6 +235,6 @@ public int hashCode() {
 csvSchema.getArrayElementSeparator(),
 csvSchema.getQuoteChar(),
 csvSchema.getEscapeChar(),
-csvSchema.getNullValue());
+Arrays.hashCode(csvSchema.getNullValue()));

Review Comment:
   Because otherwise, we check for the pointers equality, not equality of the 
array's content (what we actually want). For a quick test:
   ```
   public static void main(String[] args) {
   char[] arr1 = { 'x', 'y', 'z' };
   char[] arr2 = { 'x', 'y', 'z' };
   System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; 
arr2.hashCode(): " + arr2.hashCode());
   System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) 
+ "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2));
   }
   ```
   I could move it into a hotfix PR if you think it is more appropriate.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-06-03 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r889011653


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -206,6 +235,6 @@ public int hashCode() {
 csvSchema.getArrayElementSeparator(),
 csvSchema.getQuoteChar(),
 csvSchema.getEscapeChar(),
-csvSchema.getNullValue());
+Arrays.hashCode(csvSchema.getNullValue()));

Review Comment:
   Because otherwise, we check for the pointers equality, not equality of the 
array's content (what we actually want). For a quick test:
   ```
   public static void main(String[] args) {
   char[] arr1 = { 'x', 'y', 'z' };
   char[] arr2 = { 'x', 'y', 'z' };
   System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; 
arr2.hashCode(): " + arr2.hashCode());
   System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) 
+ "; Arrays.hashCode(arr2): " + Arrays.hashCode(arr2));
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-06-03 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r889011653


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -206,6 +235,6 @@ public int hashCode() {
 csvSchema.getArrayElementSeparator(),
 csvSchema.getQuoteChar(),
 csvSchema.getEscapeChar(),
-csvSchema.getNullValue());
+Arrays.hashCode(csvSchema.getNullValue()));

Review Comment:
   Because otherwise, we check for the pointer's equality, not equality of the 
array's content (what we actually want). For a quick test:
   ```
   public static void main(String[] args) {
   char[] arr1 = { 'x', 'y', 'z' };
   char[] arr2 = { 'x', 'y', 'z' };
   System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; 
arr2.hashCode(): " + arr2.hashCode());
   System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) 
+ "; Arrays.hashCode(arr1): " + Arrays.hashCode(arr2));
   }
   ```
   



##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -206,6 +235,6 @@ public int hashCode() {
 csvSchema.getArrayElementSeparator(),
 csvSchema.getQuoteChar(),
 csvSchema.getEscapeChar(),
-csvSchema.getNullValue());
+Arrays.hashCode(csvSchema.getNullValue()));

Review Comment:
   Because otherwise, we check for the pointers equality, not equality of the 
array's content (what we actually want). For a quick test:
   ```
   public static void main(String[] args) {
   char[] arr1 = { 'x', 'y', 'z' };
   char[] arr2 = { 'x', 'y', 'z' };
   System.out.println("arr1.hashCode(): " + arr1.hashCode() + "; 
arr2.hashCode(): " + arr2.hashCode());
   System.out.println("Arrays.hashCode(arr1):" + Arrays.hashCode(arr1) 
+ "; Arrays.hashCode(arr1): " + Arrays.hashCode(arr2));
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-05-02 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r862972424


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -65,13 +65,14 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
 private final boolean ignoreParseErrors;
 
 private CsvRowDataDeserializationSchema(
-RowType rowType,
+RowType rowResultType,
 TypeInformation resultTypeInfo,
 CsvSchema csvSchema,
 boolean ignoreParseErrors) {
 this.resultTypeInfo = resultTypeInfo;
 this.runtimeConverter =
-new 
CsvToRowDataConverters(ignoreParseErrors).createRowConverter(rowType, true);
+new CsvToRowDataConverters(ignoreParseErrors)
+.createRowConverter(rowResultType, true);

Review Comment:
   I moved the converter initialization to the Builder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

2022-04-27 Thread GitBox


afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r859663940


##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
 @Internal
 public static class Builder {
 
-private final RowType rowType;
+private final RowType rowResultType;
 private final TypeInformation resultTypeInfo;
 private CsvSchema csvSchema;
 private boolean ignoreParseErrors;
 
+/**
+ * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
+ * parameters.
+ */
+public Builder(
+RowType rowReadType,
+RowType rowResultType,

Review Comment:
   Good point, the JavaDoc of the fields was originally not there so I missed 
it.
   The naming comes from the original signature:
   ```
   public Builder(RowType rowType, TypeInformation resultTypeInfo) 
   ```
   The ideas is to underline using which CsvSchema the data is going to be read 
from the file and what is the expected output of the result. Projection seems a 
bit a too specific term used in the SQL optimization world. I can see this 
being used outside of this scope (simple filtering for whatever other reasons 
or expansion of the nested fields according to a "wider" row), depending on the 
converter used.
   I've added the the missing javadoc.
   



##
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
 @Internal
 public static class Builder {
 
-private final RowType rowType;
+private final RowType rowResultType;
 private final TypeInformation resultTypeInfo;
 private CsvSchema csvSchema;
 private boolean ignoreParseErrors;
 
+/**
+ * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
+ * parameters.
+ */
+public Builder(
+RowType rowReadType,
+RowType rowResultType,
+TypeInformation resultTypeInfo) {
+Preconditions.checkNotNull(rowReadType, "RowType must not be 
null.");
+Preconditions.checkNotNull(rowResultType, "RowType must not be 
null.");
+Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+this.rowResultType = rowResultType;
+this.resultTypeInfo = resultTypeInfo;
+this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);

Review Comment:
   We rely on field names for the conversion:
   
https://github.com/apache/flink/blob/c31452bab1fb3b9a02ab616e4c5fe5e87346dfb4/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L78-L84



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org