[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r311585045 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java ## @@ -41,5 +80,62 @@ public void testDuplicate() { writer.setSyncOnFlush(false); assertFalse(StreamWriterBaseComparator.equals(writer, other)); assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>())); + + } + + @Test + public void testMultiRowdelimiters() throws IOException { + String rowDelimiter1 = "\n"; + String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E"; + Path testFile1 = new Path(outputDir + "/test01"); + testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1); + + String rowDelimiter2 = "\r\n"; + String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E"; + Path testFile2 = new Path(outputDir + "/test02"); + testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2); + + String rowDelimiter3 = "*"; + String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E"; + Path testFile3 = new Path(outputDir + "/test03"); + testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3); + + String rowDelimiter4 = "##"; + String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E"; + Path testFile4 = new Path(outputDir + "/test04"); + testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4); + + } + + private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException { + StringWriter writer = new StringWriter(charset, rowDelimiter); + writer.open(dfs, outputFile); + StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter); + while (lineTokenizer.hasMoreTokens()){ + writer.write(lineTokenizer.nextToken()); + } + writer.close(); + FSDataInputStream inStream = dfs.open(outputFile); + byte[] buffer = new byte[inputData.getBytes(charset).length]; + readFully(inStream, buffer); + inStream.close(); + String outputData = new String(buffer, charset); + Assert.assertEquals(inputData, outputData); + + } + + private void readFully(InputStream in, byte[] buffer) throws IOException { Review comment: could you just use `org.apache.flink.util.IOUtils#readFully` to avoid code duplication? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300620094 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -40,12 +40,18 @@ private transient Charset charset; + private String rowDelimiter; + + private static final String DEFAULT_ROW_DELIMITER = "\n"; + + private byte[] rowDelimiterBytes; + /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert * strings to bytes. */ public StringWriter() { - this("UTF-8"); + this("UTF-8", "\n"); Review comment: Yes, as I wrote in another comment, I missed that `rowDelimiterBytes` depends on the `charset` which is initialised only in `open`. Normally I would change `charset` to `private final transient` field, but this might require adding `throws IOException` clause to the constructor and would change the current contract a little bit which is a separate independent issue. In that case, I would give +1 for keeping `rowDelimiterBytes` field non-final as you have initially proposed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300607832 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -40,12 +40,18 @@ private transient Charset charset; + private String rowDelimiter; + + private static final String DEFAULT_ROW_DELIMITER = "\n"; + + private byte[] rowDelimiterBytes; + /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert * strings to bytes. */ public StringWriter() { - this("UTF-8"); + this("UTF-8", "\n"); Review comment: I would keep the `rowDelimiterBytes` field, just to make sure that we do not introduce a performance regression of converting `\n` to bytes via `charset`. Otherwise I think we would need to benchmark this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300607315 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -40,12 +40,18 @@ private transient Charset charset; + private String rowDelimiter; Review comment: Ok, I missed that. Thanks for the explanation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300411710 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -40,12 +40,18 @@ private transient Charset charset; + private String rowDelimiter; Review comment: Could you mark `rowDelimeter` and `rowDelimiterBytes` fields final (and set them in the constructor) as @StephanEwen suggested? `charsetName` could also be final. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300412460 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -82,7 +103,7 @@ public void open(FileSystem fs, Path path) throws IOException { public void write(T element) throws IOException { FSDataOutputStream outputStream = getStream(); outputStream.write(element.toString().getBytes(charset)); - outputStream.write('\n'); + outputStream.write(rowDelimiterBytes); Review comment: You are still not testing anywhere, that `rowDelimiterBytes` field is being correctly used. Please add a test in `StringWriterTest` class, where you write some elements with standard and custom delimiter and that result is as you expected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r300413608 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -40,12 +40,18 @@ private transient Charset charset; + private String rowDelimiter; + + private static final String DEFAULT_ROW_DELIMITER = "\n"; + + private byte[] rowDelimiterBytes; + /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert * strings to bytes. */ public StringWriter() { - this("UTF-8"); + this("UTF-8", "\n"); Review comment: nit: You can keep this line as it was before (`this("UTF-8")`) to avoid small code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297580610 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -54,13 +59,15 @@ public StringWriter() { * * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)} */ - public StringWriter(String charsetName) { + public StringWriter(String charsetName, String rowDelimiter) { Review comment: please update the `@param` java doc. Secondly, I'm not sure, but isn't `StringWriter` a part of public API and shouldn't we preserve the old constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297579509 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -82,7 +89,10 @@ public void open(FileSystem fs, Path path) throws IOException { public void write(T element) throws IOException { FSDataOutputStream outputStream = getStream(); outputStream.write(element.toString().getBytes(charset)); - outputStream.write('\n'); + if (rowDelimiterBytes == null) { + rowDelimiterBytes = rowDelimiter.getBytes(charset); Review comment: Why are you "caching" `rowDelimiterBytes`? Isn't this a premature optimisation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297581797 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java ## @@ -32,7 +32,7 @@ @Test public void testDuplicate() { Review comment: Please also add a unit test (in this file?) to cover that the custom delimiter is actually used somehow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services