[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-08-07 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r311585045
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 ##
 @@ -41,5 +80,62 @@ public void testDuplicate() {
writer.setSyncOnFlush(false);
assertFalse(StreamWriterBaseComparator.equals(writer, other));
assertFalse(StreamWriterBaseComparator.equals(writer, new 
StringWriter<>()));
+
+   }
+
+   @Test
+   public void testMultiRowdelimiters() throws IOException {
+   String rowDelimiter1 = "\n";
+   String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + 
"C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
+   Path testFile1 = new Path(outputDir + "/test01");
+   testRowdelimiter(rowDelimiter1, testDat1, 
StandardCharsets.UTF_8.name(), testFile1);
+
+   String rowDelimiter2 = "\r\n";
+   String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + 
"C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
+   Path testFile2 = new Path(outputDir + "/test02");
+   testRowdelimiter(rowDelimiter2, testDat2, 
StandardCharsets.UTF_8.name(), testFile2);
+
+   String rowDelimiter3 = "*";
+   String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + 
"C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
+   Path testFile3 = new Path(outputDir + "/test03");
+   testRowdelimiter(rowDelimiter3, testDat3, 
StandardCharsets.UTF_8.name(), testFile3);
+
+   String rowDelimiter4 = "##";
+   String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + 
"C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
+   Path testFile4 = new Path(outputDir + "/test04");
+   testRowdelimiter(rowDelimiter4, testDat4, 
StandardCharsets.UTF_8.name(), testFile4);
+
+   }
+
+   private void testRowdelimiter(String rowDelimiter, String inputData, 
String charset, Path outputFile) throws IOException {
+   StringWriter writer = new StringWriter(charset, 
rowDelimiter);
+   writer.open(dfs, outputFile);
+   StringTokenizer lineTokenizer = new StringTokenizer(inputData, 
rowDelimiter);
+   while (lineTokenizer.hasMoreTokens()){
+   writer.write(lineTokenizer.nextToken());
+   }
+   writer.close();
+   FSDataInputStream inStream = dfs.open(outputFile);
+   byte[] buffer = new byte[inputData.getBytes(charset).length];
+   readFully(inStream, buffer);
+   inStream.close();
+   String outputData = new String(buffer, charset);
+   Assert.assertEquals(inputData, outputData);
+
+   }
+
+   private void readFully(InputStream in, byte[] buffer) throws 
IOException {
 
 Review comment:
   could you just use `org.apache.flink.util.IOUtils#readFully` to avoid code 
duplication?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-05 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300620094
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -40,12 +40,18 @@
 
private transient Charset charset;
 
+   private String rowDelimiter;
+
+   private static final String DEFAULT_ROW_DELIMITER = "\n";
+
+   private byte[] rowDelimiterBytes;
+
/**
 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset 
to convert
 * strings to bytes.
 */
public StringWriter() {
-   this("UTF-8");
+   this("UTF-8", "\n");
 
 Review comment:
   Yes, as I wrote in another comment, I missed that `rowDelimiterBytes` 
depends on the `charset` which is initialised only in `open`. 
   
   Normally I would change `charset` to `private final transient` field, but 
this might require adding `throws IOException` clause to the constructor and 
would change the current contract a little bit which is a separate independent 
issue. In that case, I would give +1 for keeping `rowDelimiterBytes` field 
non-final as you have initially proposed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-05 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300607832
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -40,12 +40,18 @@
 
private transient Charset charset;
 
+   private String rowDelimiter;
+
+   private static final String DEFAULT_ROW_DELIMITER = "\n";
+
+   private byte[] rowDelimiterBytes;
+
/**
 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset 
to convert
 * strings to bytes.
 */
public StringWriter() {
-   this("UTF-8");
+   this("UTF-8", "\n");
 
 Review comment:
   I would keep the `rowDelimiterBytes` field, just to make sure that we do not 
introduce a performance regression of converting `\n` to bytes via `charset`. 
Otherwise I think we would need to benchmark this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-05 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300607315
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -40,12 +40,18 @@
 
private transient Charset charset;
 
+   private String rowDelimiter;
 
 Review comment:
   Ok, I missed that. Thanks for the explanation 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-04 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300411710
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -40,12 +40,18 @@
 
private transient Charset charset;
 
+   private String rowDelimiter;
 
 Review comment:
   Could you mark `rowDelimeter` and `rowDelimiterBytes` fields final (and set 
them in the constructor) as @StephanEwen suggested? `charsetName` could also be 
final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-04 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300412460
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +103,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   outputStream.write(rowDelimiterBytes);
 
 Review comment:
   You are still not testing anywhere, that `rowDelimiterBytes` field is being 
correctly used. Please add a test in `StringWriterTest` class, where you write 
some elements with standard and custom delimiter and that result is as you 
expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-07-04 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r300413608
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -40,12 +40,18 @@
 
private transient Charset charset;
 
+   private String rowDelimiter;
+
+   private static final String DEFAULT_ROW_DELIMITER = "\n";
+
+   private byte[] rowDelimiterBytes;
+
/**
 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset 
to convert
 * strings to bytes.
 */
public StringWriter() {
-   this("UTF-8");
+   this("UTF-8", "\n");
 
 Review comment:
   nit: You can keep this line as it was before (`this("UTF-8")`) to avoid 
small code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297580610
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -54,13 +59,15 @@ public StringWriter() {
 *
 * @param charsetName Name of the charset to be used, must be valid 
input for {@code Charset.forName(charsetName)}
 */
-   public StringWriter(String charsetName) {
+   public StringWriter(String charsetName, String rowDelimiter) {
 
 Review comment:
   please update the `@param` java doc.
   
   Secondly, I'm not sure, but isn't `StringWriter` a part of public API and 
shouldn't we preserve the old constructor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297579509
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +89,10 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   if (rowDelimiterBytes == null) {
+   rowDelimiterBytes = rowDelimiter.getBytes(charset);
 
 Review comment:
   Why are you "caching" `rowDelimiterBytes`? Isn't this a premature 
optimisation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297581797
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 ##
 @@ -32,7 +32,7 @@
 
@Test
public void testDuplicate() {
 
 Review comment:
   Please also add a unit test (in this file?) to cover that the custom 
delimiter is actually used somehow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services