This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e30f3ad63c614db14bf94e988c40b02532e7f8e8
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Aug 7 16:41:47 2019 +0200

    fixup! StringWriter support custom row delimiter
---
 .../streaming/connectors/fs/StringWriter.java      |   2 +-
 .../streaming/connectors/fs/StringWriterTest.java  | 100 ++++++++++-----------
 2 files changed, 46 insertions(+), 56 deletions(-)

diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index ddea224..5f5c9b8 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -45,7 +45,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 
        private static final String DEFAULT_ROW_DELIMITER = "\n";
 
-       private  byte[] rowDelimiterBytes;
+       private byte[] rowDelimiterBytes;
 
        /**
         * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset 
to convert
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 0574070..6122a2a 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -18,46 +18,47 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.Assert;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.StringTokenizer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
 /**
  * Tests for {@link StringWriter}.
  */
 public class StringWriterTest {
 
+       private static final String CHARSET_NAME = 
StandardCharsets.UTF_8.name();
+
        @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
        private static MiniDFSCluster hdfsCluster;
        private static org.apache.hadoop.fs.FileSystem dfs;
 
        private static String outputDir;
 
-       @Before
-       public void createHDFS() throws IOException {
+       @BeforeClass
+       public static void createHDFS() throws IOException {
                org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
 
-               File dataDir = tempFolder.newFolder();
+               File dataDir = TEMPORARY_FOLDER.newFolder();
 
                conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
                MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
@@ -69,6 +70,14 @@ public class StringWriterTest {
                        + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort());
        }
 
+       @AfterClass
+       public static void destroyHDFS() {
+               if (hdfsCluster != null) {
+                       hdfsCluster.shutdown();
+                       hdfsCluster = null;
+               }
+       }
+
        @Test
        public void testDuplicate() {
                StringWriter<String> writer = new 
StringWriter(StandardCharsets.UTF_16.name());
@@ -80,62 +89,43 @@ public class StringWriterTest {
                writer.setSyncOnFlush(false);
                assertFalse(StreamWriterBaseComparator.equals(writer, other));
                assertFalse(StreamWriterBaseComparator.equals(writer, new 
StringWriter<>()));
-
        }
 
        @Test
        public void testMultiRowdelimiters() throws IOException {
-               String rowDelimiter1 = "\n";
-               String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + 
"C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
-               Path testFile1 = new Path(outputDir + "/test01");
-               testRowdelimiter(rowDelimiter1, testDat1, 
StandardCharsets.UTF_8.name(), testFile1);
-
-               String rowDelimiter2 = "\r\n";
-               String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + 
"C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
-               Path testFile2 = new Path(outputDir + "/test02");
-               testRowdelimiter(rowDelimiter2, testDat2, 
StandardCharsets.UTF_8.name(), testFile2);
-
-               String rowDelimiter3 = "*";
-               String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + 
"C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
-               Path testFile3 = new Path(outputDir + "/test03");
-               testRowdelimiter(rowDelimiter3, testDat3, 
StandardCharsets.UTF_8.name(), testFile3);
-
-               String rowDelimiter4 = "##";
-               String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + 
"C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
-               Path testFile4 = new Path(outputDir + "/test04");
-               testRowdelimiter(rowDelimiter4, testDat4, 
StandardCharsets.UTF_8.name(), testFile4);
-
+               testRowDelimiter("\n");
+               testRowDelimiter("\r\n");
+               testRowDelimiter("*");
+               testRowDelimiter("##");
        }
 
-       private void testRowdelimiter(String rowDelimiter, String inputData, 
String charset, Path outputFile) throws IOException {
-               StringWriter<String> writer = new StringWriter(charset, 
rowDelimiter);
-               writer.open(dfs, outputFile);
-               StringTokenizer lineTokenizer = new StringTokenizer(inputData, 
rowDelimiter);
-               while (lineTokenizer.hasMoreTokens()){
-                       writer.write(lineTokenizer.nextToken());
-               }
-               writer.close();
-               FSDataInputStream inStream = dfs.open(outputFile);
-               byte[] buffer = new byte[inputData.getBytes(charset).length];
-               readFully(inStream, buffer);
-               inStream.close();
-               String outputData = new String(buffer, charset);
-               Assert.assertEquals(inputData, outputData);
+       private void testRowDelimiter(String rowDelimiter) throws IOException {
+               String[] inputData = new String[] {"A", "B", "C", "D", "E"};
 
-       }
+               Path outputPath = new 
Path(TEMPORARY_FOLDER.newFile().getAbsolutePath());
 
-       private void readFully(InputStream in, byte[] buffer) throws 
IOException {
-               int pos = 0;
-               int remaining = buffer.length;
-
-               while (remaining > 0) {
-                       int read = in.read(buffer, pos, remaining);
-                       if (read == -1) {
-                               throw new EOFException();
+               StringWriter<String> writer = new StringWriter(CHARSET_NAME, 
rowDelimiter);
+               try {
+                       writer.open(dfs, outputPath);
+                       for (String input: inputData) {
+                               writer.write(input);
                        }
+               }
+               finally {
+                       writer.close();
+               }
 
-                       pos += read;
-                       remaining -= read;
+               try (FSDataInputStream inStream = dfs.open(outputPath)) {
+                       String expectedOutput = String.join(rowDelimiter, 
inputData);
+                       byte[] buffer = new 
byte[expectedOutput.getBytes(CHARSET_NAME).length];
+                       readFully(inStream, buffer);
+
+                       String outputData = new String(buffer, CHARSET_NAME);
+                       assertEquals(expectedOutput, outputData);
                }
        }
+
+       private void readFully(InputStream in, byte[] buffer) throws 
IOException {
+               IOUtils.readFully(in, buffer, 0, buffer.length);
+       }
 }

Reply via email to