This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e30f3ad63c614db14bf94e988c40b02532e7f8e8 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Wed Aug 7 16:41:47 2019 +0200 fixup! StringWriter support custom row delimiter --- .../streaming/connectors/fs/StringWriter.java | 2 +- .../streaming/connectors/fs/StringWriterTest.java | 100 ++++++++++----------- 2 files changed, 46 insertions(+), 56 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java index ddea224..5f5c9b8 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -45,7 +45,7 @@ public class StringWriter<T> extends StreamWriterBase<T> { private static final String DEFAULT_ROW_DELIMITER = "\n"; - private byte[] rowDelimiterBytes; + private byte[] rowDelimiterBytes; /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java index 0574070..6122a2a 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java @@ -18,46 +18,47 @@ package org.apache.flink.streaming.connectors.fs; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.NetUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.Assert; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.StringTokenizer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - /** * Tests for {@link StringWriter}. */ public class StringWriterTest { + private static final String CHARSET_NAME = StandardCharsets.UTF_8.name(); + @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); private static MiniDFSCluster hdfsCluster; private static org.apache.hadoop.fs.FileSystem dfs; private static String outputDir; - @Before - public void createHDFS() throws IOException { + @BeforeClass + public static void createHDFS() throws IOException { org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - File dataDir = tempFolder.newFolder(); + File dataDir = TEMPORARY_FOLDER.newFolder(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); @@ -69,6 +70,14 @@ public class StringWriterTest { + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()); } + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + hdfsCluster = null; + } + } + @Test public void testDuplicate() { StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name()); @@ -80,62 +89,43 @@ public class StringWriterTest { writer.setSyncOnFlush(false); assertFalse(StreamWriterBaseComparator.equals(writer, other)); assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>())); - } @Test public void testMultiRowdelimiters() throws IOException { - String rowDelimiter1 = "\n"; - String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E"; - Path testFile1 = new Path(outputDir + "/test01"); - testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1); - - String rowDelimiter2 = "\r\n"; - String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E"; - Path testFile2 = new Path(outputDir + "/test02"); - testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2); - - String rowDelimiter3 = "*"; - String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E"; - Path testFile3 = new Path(outputDir + "/test03"); - testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3); - - String rowDelimiter4 = "##"; - String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E"; - Path testFile4 = new Path(outputDir + "/test04"); - testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4); - + testRowDelimiter("\n"); + testRowDelimiter("\r\n"); + testRowDelimiter("*"); + testRowDelimiter("##"); } - private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException { - StringWriter<String> writer = new StringWriter(charset, rowDelimiter); - writer.open(dfs, outputFile); - StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter); - while (lineTokenizer.hasMoreTokens()){ - writer.write(lineTokenizer.nextToken()); - } - writer.close(); - FSDataInputStream inStream = dfs.open(outputFile); - byte[] buffer = new byte[inputData.getBytes(charset).length]; - readFully(inStream, buffer); - inStream.close(); - String outputData = new String(buffer, charset); - Assert.assertEquals(inputData, outputData); + private void testRowDelimiter(String rowDelimiter) throws IOException { + String[] inputData = new String[] {"A", "B", "C", "D", "E"}; - } + Path outputPath = new Path(TEMPORARY_FOLDER.newFile().getAbsolutePath()); - private void readFully(InputStream in, byte[] buffer) throws IOException { - int pos = 0; - int remaining = buffer.length; - - while (remaining > 0) { - int read = in.read(buffer, pos, remaining); - if (read == -1) { - throw new EOFException(); + StringWriter<String> writer = new StringWriter(CHARSET_NAME, rowDelimiter); + try { + writer.open(dfs, outputPath); + for (String input: inputData) { + writer.write(input); } + } + finally { + writer.close(); + } - pos += read; - remaining -= read; + try (FSDataInputStream inStream = dfs.open(outputPath)) { + String expectedOutput = String.join(rowDelimiter, inputData); + byte[] buffer = new byte[expectedOutput.getBytes(CHARSET_NAME).length]; + readFully(inStream, buffer); + + String outputData = new String(buffer, CHARSET_NAME); + assertEquals(expectedOutput, outputData); } } + + private void readFully(InputStream in, byte[] buffer) throws IOException { + IOUtils.readFully(in, buffer, 0, buffer.length); + } }