This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f36df64e5a375ae203ba32662e5b01fcc38e340
Author: Ryan Tao <xmusa...@163.com>
AuthorDate: Fri Aug 16 23:15:36 2019 +0800

    [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before close
    
    This closes #9467
---
 .../api/scala/operators/ScalaCsvOutputFormat.java  |  1 +
 .../scala/operators/ScalaCsvOutputFormatTest.java  | 87 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
index ee5ba65..7fa540d 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -168,6 +168,7 @@ public class ScalaCsvOutputFormat<T extends Product> 
extends FileOutputFormat<T>
        @Override
        public void close() throws IOException {
                if (wrt != null) {
+                       this.wrt.flush();
                        this.wrt.close();
                }
                super.close();
diff --git 
a/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
 
b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
new file mode 100644
index 0000000..7677ab0
--- /dev/null
+++ 
b/flink-scala/src/test/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormatTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import scala.Tuple3;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ScalaCsvOutputFormat}.
+ */
+public class ScalaCsvOutputFormatTest {
+
+       private String path;
+       private ScalaCsvOutputFormat<Tuple3<String, String, Integer>> 
csvOutputFormat;
+
+       @Rule
+       public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+       @Before
+       public void setUp() throws Exception {
+               path = tmpFolder.newFile().getAbsolutePath();
+               csvOutputFormat = new ScalaCsvOutputFormat<>(new Path(path));
+               csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+               
csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+               csvOutputFormat.open(0, 1);
+       }
+
+       @Test
+       public void testNullAllow() throws Exception {
+               try {
+                       csvOutputFormat.setAllowNullValues(true);
+                       csvOutputFormat.writeRecord(new Tuple3<>("One", null, 
8));
+               } finally {
+                       csvOutputFormat.close();
+               }
+               java.nio.file.Path p = Paths.get(path);
+               Assert.assertTrue(Files.exists(p));
+               List<String> lines = Files.readAllLines(Paths.get(path), 
StandardCharsets.UTF_8);
+               Assert.assertEquals(1, lines.size());
+               Assert.assertEquals("One,,8", lines.get(0));
+       }
+
+       @Test
+       public void testNullDisallowOnDefault() throws Exception {
+               try {
+                       csvOutputFormat.setAllowNullValues(false);
+                       csvOutputFormat.writeRecord(new Tuple3<>("One", null, 
8));
+                       fail("should fail with an exception");
+               } catch (RuntimeException e) {
+                       // expected
+               } finally {
+                       csvOutputFormat.close();
+               }
+       }
+}

Reply via email to