This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 66aa607845b [24469] Implement CsvIO.Write and supporting classes 
(#24630)
66aa607845b is described below

commit 66aa607845b43e4431723e9382755389bbba0412
Author: Damon <damondoug...@users.noreply.github.com>
AuthorDate: Fri Jan 27 23:45:59 2023 +0000

    [24469] Implement CsvIO.Write and supporting classes (#24630)
    
    This PR closes #24469 and addresses #20312 by implementing CsvIO.Write and 
supporting classes. Please see #24469 for more details.
---
 sdks/java/io/csv/build.gradle                      |  34 ++
 .../java/org/apache/beam/sdk/io/csv/CsvIO.java     | 458 +++++++++++++++++++++
 .../apache/beam/sdk/io/csv/CsvRowConversions.java  | 152 +++++++
 .../org/apache/beam/sdk/io/csv/package-info.java   |  20 +
 .../org/apache/beam/sdk/io/csv/CsvIOTestData.java  | 106 +++++
 .../apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java | 265 ++++++++++++
 .../org/apache/beam/sdk/io/csv/CsvIOWriteTest.java | 308 ++++++++++++++
 .../beam/sdk/io/csv/RowToCsvCSVFormatTest.java     | 353 ++++++++++++++++
 .../io/csv/RowToCsvPredefinedCSVFormatsTest.java   | 284 +++++++++++++
 settings.gradle.kts                                |   1 +
 10 files changed, 1981 insertions(+)

diff --git a/sdks/java/io/csv/build.gradle b/sdks/java/io/csv/build.gradle
new file mode 100644
index 00000000000..ac1a4ba16e6
--- /dev/null
+++ b/sdks/java/io/csv/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.csv'
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: CSV"
+ext.summary = "IO to read and write CSV files."
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.commons_csv
+    implementation library.java.vendored_guava_26_0_jre
+    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+    testImplementation library.java.junit
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
+}
\ No newline at end of file
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
new file mode 100644
index 00000000000..084e7de6310
--- /dev/null
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * 
href="https://github.com/apache/beam/issues/24552";>https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link 
CsvIO.Write}, using {@link
+ * CsvIO#writeRows} or {@link CsvIO#write}. {@link CsvIO.Write} supports 
writing {@link Row} or
+ * custom Java types using an inferred {@link Schema}. Examples below show 
both scenarios. See the
+ * Beam Programming Guide on <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas";>inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link 
Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing the parts of {@link Schema} 
aware types that do not
+ * contain any nested {@link FieldType}s such a {@link
+ * org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or repeated {@link
+ * org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated 
with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its 
{@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write 
one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", 
CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is 
repeated for every
+ * file, whereas by default, {@link CsvIO.Write} will write all fields in 
<b>sorted order</b> of the
+ * field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} 
writes, use {@link
+ * CSVFormat#withHeader}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; 
matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are 
valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader} to 
control the order and
+ * subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", 
CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is 
repeated for every
+ * file, but will only include the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments} as shown below. Note that {@link 
CSVFormat#withCommentMarker} is
+ * required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT
+ *          .withCommentMarker('#')
+ *          .withHeaderComments("Bank Report", "1970-01-01", "Operator: John 
Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header 
and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types 
illustrated above,
+ * except we use {@link CsvIO#writeRows} as shown below for the same {@code 
Transaction} class. We
+ * derive {@code Transaction}'s {@link Schema} using a {@link
+ * 
org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. 
Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers 
are instead
+ * encouraged to take advantage of {@link
+ * 
org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = 
defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield 
the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} 
properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames}
+ *   <li>{@link CSVFormat#withAutoFlush}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces}
+ * </ul>
+ */
+public class CsvIO {
+  /**
+   * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV 
records to the fields.
+   *
+   * <ul>
+   *   <li>{@link FieldType#BYTE}
+   *   <li>{@link FieldType#BOOLEAN}
+   *   <li>{@link FieldType#DATETIME}
+   *   <li>{@link FieldType#DECIMAL}
+   *   <li>{@link FieldType#DOUBLE}
+   *   <li>{@link FieldType#INT16}
+   *   <li>{@link FieldType#INT32}
+   *   <li>{@link FieldType#INT64}
+   *   <li>{@link FieldType#FLOAT}
+   *   <li>{@link FieldType#STRING}
+   * </ul>
+   */
+  public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} 
format. */
+  public static <T> Write<T> write(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<T>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** Instantiates a {@link Write} for writing {@link Row}s in {@link 
CSVFormat} format. */
+  public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<Row>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** {@link PTransform} for writing CSV files. */
+  @AutoValue
+  public abstract static class Write<T>
+      extends PTransform<PCollection<T>, WriteFilesResult<String>> {
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+    }
+
+    /** Specifies all data written without spilling, simplifying the pipeline. 
*/
+    public Write<T> withNoSpilling() {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * TextIO.Write#withNumShards}.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build();
+    }
+
+    /**
+     * Forces a single file as output and empty shard name template. See {@link
+     * TextIO.Write#withoutSharding}.
+     */
+    public Write<T> withoutSharding() {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build();
+    }
+
+    /**
+     * Uses the given {@link ShardNameTemplate} for naming output files. See 
{@link
+     * TextIO.Write#withShardNameTemplate}.
+     */
+    public Write<T> withShardTemplate(String shardTemplate) {
+      return toBuilder()
+          
.setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate))
+          .build();
+    }
+
+    /** Configures the filename suffix for written files. See {@link 
TextIO.Write#withSuffix}. */
+    public Write<T> withSuffix(String suffix) {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build();
+    }
+
+    /**
+     * Set the base directory used to generate temporary files. See {@link
+     * TextIO.Write#withTempDirectory}.
+     */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build();
+    }
+
+    /**
+     * Preserves windowing of input elements and writes them to files based on 
the element's window.
+     * See {@link TextIO.Write#withWindowedWrites}.
+     */
+    public Write<T> withWindowedWrites() {
+      return 
toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build();
+    }
+
+    /**
+     * Returns a transform for writing to text files like this one but that 
has the given {@link
+     * FileBasedSink.WritableByteChannelFactory} to be used by the {@link 
FileBasedSink} during
+     * output. See {@link TextIO.Write#withWritableByteChannelFactory}.
+     */
+    public Write<T> withWritableByteChannelFactory(
+        FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) {
+      return toBuilder()
+          .setTextIOWrite(
+              
getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory))
+          .build();
+    }
+
+    /** The underlying {@link FileIO.Write} that writes converted input to CSV 
formatted output. */
+    abstract TextIO.Write getTextIOWrite();
+
+    /** The {@link CSVFormat} to inform headers, header comments, and format 
CSV row content. */
+    abstract CSVFormat getCSVFormat();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * The underlying {@link FileIO.Write} that writes converted input to 
CSV formatted output.
+       */
+      abstract Builder<T> setTextIOWrite(TextIO.Write value);
+
+      /** The {@link CSVFormat} to convert input. Defaults to {@link 
CSVFormat#DEFAULT}. */
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+
+        if (getCSVFormat().getHeaderComments() != null) {
+          checkArgument(
+              getCSVFormat().isCommentMarkerSet(),
+              "CSVFormat withCommentMarker required when withHeaderComments");
+        }
+
+        return autoBuild();
+      }
+    }
+
+    @Override
+    public WriteFilesResult<String> expand(PCollection<T> input) {
+      if (!input.hasSchema()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s requires an input Schema. Note that only Row or user 
classes are supported. Consider using TextIO or FileIO directly when writing 
primitive types",
+                Write.class.getName()));
+      }
+
+      Schema schema = input.getSchema();
+
+      RowCoder rowCoder = RowCoder.of(schema);
+
+      PCollection<Row> rows =
+          input
+              .apply("To Rows", 
MapElements.into(rows()).via(input.getToRowFunction()))
+              .setCoder(rowCoder);
+
+      CSVFormat csvFormat = buildHeaderFromSchemaIfNeeded(getCSVFormat(), 
schema);
+
+      TextIO.Write write = getTextIOWrite();
+
+      write = writeWithCSVFormatHeaderAndComments(csvFormat, write);
+
+      SerializableFunction<Row, String> toCsvFn =
+          CsvRowConversions.RowToCsv.builder()
+              .setCSVFormat(csvFormat)
+              .setSchema(input.getSchema())
+              .build();
+
+      PCollection<String> csv = rows.apply("To CSV", 
MapElements.into(strings()).via(toCsvFn));
+
+      return csv.apply("Write CSV", write.withOutputFilenames());
+    }
+
+    private static CSVFormat buildHeaderFromSchemaIfNeeded(CSVFormat 
csvFormat, Schema schema) {
+      if (csvFormat.getHeader() == null) {
+        csvFormat = 
csvFormat.withHeader(schema.sorted().getFieldNames().toArray(new String[0]));
+      }
+
+      return csvFormat;
+    }
+
+    private static TextIO.Write writeWithCSVFormatHeaderAndComments(
+        CSVFormat csvFormat, TextIO.Write write) {
+
+      if (csvFormat.getSkipHeaderRecord()) {
+        return write;
+      }
+
+      String[] header = requireNonNull(csvFormat.getHeader());
+      List<String> result = new ArrayList<>();
+      if (csvFormat.getHeaderComments() != null) {
+        for (String comment : csvFormat.getHeaderComments()) {
+          result.add(csvFormat.getCommentMarker() + " " + comment);
+        }
+      }
+
+      CSVFormat withoutHeaderComments = csvFormat.withHeaderComments();
+
+      result.add(
+          withoutHeaderComments
+              // The withSkipHeaderRecord parameter prevents CSVFormat from 
outputting two copies of
+              // the header.
+              .withSkipHeaderRecord()
+              .format((Object[]) header));
+
+      return write.withHeader(String.join("\n", result));
+    }
+  }
+
+  private static TextIO.Write createDefaultTextIOWrite(String to) {
+    return TextIO.write().to(to).withSuffix(DEFAULT_FILENAME_SUFFIX);
+  }
+}
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java
new file mode 100644
index 00000000000..6faccfe021f
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIO.VALID_FIELD_TYPE_SET;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Contains classes and methods to help with converting between {@link Row} 
and CSV strings. */
+class CsvRowConversions {
+
+  /** Converts between {@link Row} and CSV string using a {@link CSVFormat}. */
+  @AutoValue
+  abstract static class RowToCsv implements SerializableFunction<Row, String> {
+
+    static Builder builder() {
+      return new AutoValue_CsvRowConversions_RowToCsv.Builder();
+    }
+
+    /** The expected {@link Schema} of the {@link Row} input. */
+    abstract Schema getSchema();
+
+    /** The {@link CSVFormat} of the converted {@link Row} input. */
+    abstract CSVFormat getCSVFormat();
+
+    /** Converts a {@link Row} to a CSV string formatted using {@link 
#getCSVFormat}. */
+    @Override
+    public String apply(Row input) {
+      Row safeInput = checkNotNull(input);
+      String[] header = getHeader();
+      Object[] values = new Object[header.length];
+      for (int i = 0; i < header.length; i++) {
+        values[i] = safeInput.getValue(header[i]);
+      }
+      return getCSVFormat().format(values);
+    }
+
+    @NonNull
+    String[] getHeader() {
+      return checkNotNull(getCSVFormat().getHeader());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /** The expected {@link Schema} of the {@link Row} input. */
+      abstract Builder setSchema(Schema schema);
+
+      abstract Schema getSchema();
+
+      /** The {@link CSVFormat} of the converted {@link Row} input. */
+      abstract Builder setCSVFormat(CSVFormat format);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract RowToCsv autoBuild();
+
+      final RowToCsv build() {
+        checkArgument(getSchema().getFieldCount() > 0, "Schema has no fields");
+        setCSVFormat(
+            getCSVFormat()
+                // CSVFormat was designed to write to a single file.
+                // Therefore, we need to apply withSkipHeaderRecord to prevent 
CSVFormat to apply
+                // its header to each converted Row in the context of RowToCsv.
+                .withSkipHeaderRecord()
+                // Delegate to TextIO.Write.withDelimiter instead.
+                .withRecordSeparator(' ')
+                .withHeaderComments());
+        validateCSVFormat(getCSVFormat());
+        validateHeaderAgainstSchema(getCSVFormat().getHeader(), getSchema());
+
+        return autoBuild();
+      }
+    }
+  }
+
+  private static void validateCSVFormat(CSVFormat csvFormat) {
+    String[] header = checkNotNull(csvFormat.getHeader(), "CSVFormat 
withHeader is required");
+
+    checkArgument(header.length > 0, "CSVFormat withHeader requires at least 
one column");
+
+    checkArgument(!csvFormat.getAutoFlush(), "withAutoFlush is an illegal 
CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getIgnoreHeaderCase(), "withIgnoreHeaderCase is an illegal 
CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getAllowMissingColumnNames(),
+        "withAllowMissingColumnNames is an illegal CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getIgnoreSurroundingSpaces(),
+        "withIgnoreSurroundingSpaces is an illegal CSVFormat setting");
+  }
+
+  private static void validateHeaderAgainstSchema(String[] csvHeader, Schema 
schema) {
+    Set<String> distinctColumns = new HashSet<>(Arrays.asList(csvHeader));
+    List<String> mismatchColumns = new ArrayList<>();
+    List<String> invalidTypes = new ArrayList<>();
+    Set<TypeName> validTypeNames =
+        
VALID_FIELD_TYPE_SET.stream().map(FieldType::getTypeName).collect(Collectors.toSet());
+    for (String column : distinctColumns) {
+      if (!schema.hasField(column)) {
+        mismatchColumns.add(column);
+        continue;
+      }
+      TypeName typeName = schema.getField(column).getType().getTypeName();
+      if (!validTypeNames.contains(typeName)) {
+        invalidTypes.add(column);
+      }
+    }
+
+    checkArgument(
+        mismatchColumns.isEmpty(),
+        "columns in CSVFormat header do not exist in Schema: %s",
+        String.join(",", mismatchColumns));
+    checkArgument(
+        invalidTypes.isEmpty(),
+        "columns in header match fields in Schema with invalid types: %s. See 
CsvIO#VALID_FIELD_TYPE_SET for a list of valid field types.",
+        String.join(",", invalidTypes));
+  }
+}
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java
new file mode 100644
index 00000000000..4cab7a9abf4
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing CSV files. */
+package org.apache.beam.sdk.io.csv;
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java
new file mode 100644
index 00000000000..e1048b195d2
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypes;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypesToRowFn;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.nullableAllPrimitiveDataTypes;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.timeContaining;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.timeContainingToRowFn;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+
+/** Shared data for use in {@link CsvIO} tests and related classes. */
+class CsvIOTestData {
+  static final CsvIOTestData DATA = new CsvIOTestData();
+
+  private CsvIOTestData() {}
+
+  final Row allPrimitiveDataTypesRow =
+      requireNonNull(
+          allPrimitiveDataTypesToRowFn()
+              .apply(
+                  allPrimitiveDataTypes(
+                      false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 
1, 1L, "a")));
+
+  final Row allPrimitiveDataTypesRowWithPadding =
+      requireNonNull(
+          allPrimitiveDataTypesToRowFn()
+              .apply(
+                  allPrimitiveDataTypes(
+                      false,
+                      (byte) 1,
+                      BigDecimal.TEN,
+                      1.0,
+                      1.0f,
+                      (short) 1.0,
+                      1,
+                      1L,
+                      "       a           ")));
+
+  final List<Row> allPrimitiveDataTypeRows =
+      Stream.of(
+              allPrimitiveDataTypes(
+                  false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 1, 
1L, "a"),
+              allPrimitiveDataTypes(
+                  false,
+                  (byte) 2,
+                  BigDecimal.TEN.add(BigDecimal.TEN),
+                  2.0,
+                  2.0f,
+                  (short) 2.0,
+                  2,
+                  2L,
+                  "b"),
+              allPrimitiveDataTypes(
+                  false,
+                  (byte) 3,
+                  BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN),
+                  3.0,
+                  3.0f,
+                  (short) 3.0,
+                  3,
+                  3L,
+                  "c"))
+          .map(allPrimitiveDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final Row nullableTypesRowAllNull =
+      requireNonNull(
+          nullableAllPrimitiveDataTypesToRowFn()
+              .apply(nullableAllPrimitiveDataTypes(null, null, null, null, 
null, null)));
+
+  final Row nullableTypesRowSomeNull =
+      requireNonNull(
+          nullableAllPrimitiveDataTypesToRowFn()
+              .apply(nullableAllPrimitiveDataTypes(true, null, null, 1, null, 
"a")));
+
+  final Row timeContainingRow =
+      timeContainingToRowFn()
+          .apply(
+              timeContaining(
+                  Instant.ofEpochMilli(1L), 
Collections.singletonList(Instant.ofEpochMilli(1L))));
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java
new file mode 100644
index 00000000000..c7ccf079f33
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+
+// TODO(https://github.com/apache/beam/issues/24980): replace with common 
schema-aware classes; see
+// task description.
+/** Classes and data to drive CsvIO tests. */
+class CsvIOTestJavaBeans {
+
+  private static final AutoValueSchema DEFAULT_SCHEMA_PROVIDER = new 
AutoValueSchema();
+
+  /** Convenience method for {@link AllPrimitiveDataTypes} instantiation. */
+  static AllPrimitiveDataTypes allPrimitiveDataTypes(
+      Boolean aBoolean,
+      Byte aByte,
+      BigDecimal aDecimal,
+      Double aDouble,
+      Float aFloat,
+      Short aShort,
+      Integer anInteger,
+      Long aLong,
+      String aString) {
+    return new AutoValue_CsvIOTestJavaBeans_AllPrimitiveDataTypes.Builder()
+        .setABoolean(aBoolean)
+        .setAByte(aByte)
+        .setADecimal(aDecimal)
+        .setADouble(aDouble)
+        .setAFloat(aFloat)
+        .setAShort(aShort)
+        .setAnInteger(anInteger)
+        .setALong(aLong)
+        .setAString(aString)
+        .build();
+  }
+
+  /** Convenience method for {@link NullableAllPrimitiveDataTypes} 
instantiation. */
+  static NullableAllPrimitiveDataTypes nullableAllPrimitiveDataTypes(
+      @Nullable Boolean aBoolean,
+      @Nullable Double aDouble,
+      @Nullable Float aFloat,
+      @Nullable Integer anInteger,
+      @Nullable Long aLong,
+      @Nullable String aString) {
+    return new 
AutoValue_CsvIOTestJavaBeans_NullableAllPrimitiveDataTypes.Builder()
+        .setABoolean(aBoolean)
+        .setADouble(aDouble)
+        .setAFloat(aFloat)
+        .setAnInteger(anInteger)
+        .setALong(aLong)
+        .setAString(aString)
+        .build();
+  }
+
+  /** Convenience method for {@link TimeContaining} instantiation. */
+  static TimeContaining timeContaining(Instant instant, List<Instant> 
instantList) {
+    return new AutoValue_CsvIOTestJavaBeans_TimeContaining.Builder()
+        .setInstant(instant)
+        .setInstantList(instantList)
+        .build();
+  }
+
+  private static final TypeDescriptor<AllPrimitiveDataTypes>
+      ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = 
TypeDescriptor.of(AllPrimitiveDataTypes.class);
+
+  /** The schema for {@link AllPrimitiveDataTypes}. */
+  static final Schema ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+      
requireNonNull(DEFAULT_SCHEMA_PROVIDER.schemaFor(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR));
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link 
AllPrimitiveDataTypes} to a
+   * {@link Row}.
+   */
+  static SerializableFunction<AllPrimitiveDataTypes, Row> 
allPrimitiveDataTypesToRowFn() {
+    return 
DEFAULT_SCHEMA_PROVIDER.toRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<NullableAllPrimitiveDataTypes>
+      NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR =
+          TypeDescriptor.of(NullableAllPrimitiveDataTypes.class);
+
+  /** The schema for {@link NullableAllPrimitiveDataTypes}. */
+  static final Schema NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+      requireNonNull(
+          
DEFAULT_SCHEMA_PROVIDER.schemaFor(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR));
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link 
NullableAllPrimitiveDataTypes}
+   * to a {@link Row}.
+   */
+  static SerializableFunction<NullableAllPrimitiveDataTypes, Row>
+      nullableAllPrimitiveDataTypesToRowFn() {
+    return 
DEFAULT_SCHEMA_PROVIDER.toRowFunction(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<TimeContaining> 
TIME_CONTAINING_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(TimeContaining.class);
+
+  /** The schema for {@link TimeContaining}. */
+  static final Schema TIME_CONTAINING_SCHEMA =
+      
requireNonNull(DEFAULT_SCHEMA_PROVIDER.schemaFor(TIME_CONTAINING_TYPE_DESCRIPTOR));
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link 
TimeContaining} to a {@link
+   * Row}.
+   */
+  static SerializableFunction<TimeContaining, Row> timeContainingToRowFn() {
+    return 
DEFAULT_SCHEMA_PROVIDER.toRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Contains all primitive Java types i.e. String, Integer, etc and {@link 
BigDecimal}. The purpose
+   * of this class is to test schema-aware PTransforms with flat {@link 
Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class AllPrimitiveDataTypes implements Serializable {
+
+    abstract Boolean getABoolean();
+
+    abstract Byte getAByte();
+
+    abstract BigDecimal getADecimal();
+
+    abstract Double getADouble();
+
+    abstract Float getAFloat();
+
+    abstract Short getAShort();
+
+    abstract Integer getAnInteger();
+
+    abstract Long getALong();
+
+    abstract String getAString();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setABoolean(Boolean value);
+
+      abstract Builder setAByte(Byte value);
+
+      abstract Builder setADecimal(BigDecimal value);
+
+      abstract Builder setADouble(Double value);
+
+      abstract Builder setAFloat(Float value);
+
+      abstract Builder setAShort(Short value);
+
+      abstract Builder setAnInteger(Integer value);
+
+      abstract Builder setALong(Long value);
+
+      abstract Builder setAString(String value);
+
+      abstract AllPrimitiveDataTypes build();
+    }
+  }
+
+  /**
+   * Contains all nullable primitive Java types i.e. String, Integer, etc and 
{@link BigDecimal}.
+   * The purpose of this class is to test schema-aware PTransforms with flat 
{@link Schema} {@link
+   * Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class NullableAllPrimitiveDataTypes implements Serializable {
+
+    @Nullable
+    abstract Boolean getABoolean();
+
+    @Nullable
+    abstract Double getADouble();
+
+    @Nullable
+    abstract Float getAFloat();
+
+    @Nullable
+    abstract Integer getAnInteger();
+
+    @Nullable
+    abstract Long getALong();
+
+    @Nullable
+    abstract String getAString();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setABoolean(Boolean value);
+
+      abstract Builder setADouble(Double value);
+
+      abstract Builder setAFloat(Float value);
+
+      abstract Builder setAnInteger(Integer value);
+
+      abstract Builder setALong(Long value);
+
+      abstract Builder setAString(String value);
+
+      abstract NullableAllPrimitiveDataTypes build();
+    }
+  }
+
+  /**
+   * Contains time-related types. The purpose of this class is to test 
schema-aware PTransforms with
+   * time-related {@link Schema.FieldType} containing {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class TimeContaining {
+
+    abstract Instant getInstant();
+
+    abstract List<Instant> getInstantList();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setInstant(Instant value);
+
+      abstract Builder setInstantList(List<Instant> value);
+
+      abstract TimeContaining build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java
new file mode 100644
index 00000000000..dc83007a2a9
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.AllPrimitiveDataTypes;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypes;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CsvIO.Write}. */
+@RunWith(JUnit4.class)
+public class CsvIOWriteTest {
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline errorPipeline = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Test
+  public void headersWithCommentsWrittenFirstOnEachShard() {
+    File folder =
+        createFolder(
+            AllPrimitiveDataTypes.class.getSimpleName(),
+            "headersWithCommentsWrittenFirstOnEachShard");
+
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    String expectedHeader = 
"aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger";
+    CSVFormat csvFormat =
+        CSVFormat.DEFAULT.withHeaderComments("foo", "bar", 
"baz").withCommentMarker('#');
+
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), 
csvFormat).withNumShards(3));
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<FileIO.ReadableFile> files =
+        readPipeline
+            .apply(FileIO.match().filepattern(toFilenamePrefix(folder) + "*"))
+            .apply(FileIO.readMatches());
+    PAssert.that(files)
+        .satisfies(
+            (Iterable<FileIO.ReadableFile> itr) -> {
+              Iterable<FileIO.ReadableFile> safeItr = requireNonNull(itr);
+              for (FileIO.ReadableFile file : safeItr) {
+                try {
+                  List<String> lines = 
Splitter.on('\n').splitToList(file.readFullyAsUTF8String());
+                  assertFalse(lines.isEmpty());
+                  assertEquals("# foo", lines.get(0));
+                  assertEquals("# bar", lines.get(1));
+                  assertEquals("# baz", lines.get(2));
+                  assertEquals(expectedHeader, lines.get(3));
+
+                  assertTrue(
+                      lines.subList(4, 
lines.size()).stream().noneMatch(expectedHeader::equals));
+                } catch (IOException e) {
+                  fail(e.getMessage());
+                }
+              }
+              return null;
+            });
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void headersWrittenFirstOnEachShard() {
+    File folder =
+        createFolder(AllPrimitiveDataTypes.class.getSimpleName(), 
"headersWrittenFirstOnEachShard");
+
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    String expectedHeader = 
"aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger";
+    CSVFormat csvFormat = CSVFormat.DEFAULT;
+
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), 
csvFormat).withNumShards(3));
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<FileIO.ReadableFile> files =
+        readPipeline
+            .apply(FileIO.match().filepattern(toFilenamePrefix(folder) + "*"))
+            .apply(FileIO.readMatches());
+    PAssert.that(files)
+        .satisfies(
+            (Iterable<FileIO.ReadableFile> itr) -> {
+              Iterable<FileIO.ReadableFile> safeItr = requireNonNull(itr);
+              for (FileIO.ReadableFile file : safeItr) {
+                try {
+                  List<String> lines = 
Splitter.on('\n').splitToList(file.readFullyAsUTF8String());
+                  assertFalse(lines.isEmpty());
+                  assertEquals(expectedHeader, lines.get(0));
+                  assertTrue(
+                      lines.subList(1, 
lines.size()).stream().noneMatch(expectedHeader::equals));
+                } catch (IOException e) {
+                  fail(e.getMessage());
+                }
+              }
+              return null;
+            });
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void writesUserDefinedTypes() {
+    File folder =
+        createFolder(AllPrimitiveDataTypes.class.getSimpleName(), 
"writesUserDefinedTypes");
+
+    PCollection<AllPrimitiveDataTypes> input =
+        writePipeline.apply(
+            Create.of(
+                allPrimitiveDataTypes(
+                    false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 
1, 1L, "a"),
+                allPrimitiveDataTypes(
+                    false,
+                    (byte) 2,
+                    BigDecimal.TEN.add(BigDecimal.TEN),
+                    2.0,
+                    2.0f,
+                    (short) 2.0,
+                    2,
+                    2L,
+                    "b"),
+                allPrimitiveDataTypes(
+                    false,
+                    (byte) 3,
+                    BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN),
+                    3.0,
+                    3.0f,
+                    (short) 3.0,
+                    3,
+                    3L,
+                    "c")));
+
+    String expectedHeader = 
"aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger";
+    CSVFormat csvFormat = CSVFormat.DEFAULT;
+    input.apply(
+        CsvIO.<AllPrimitiveDataTypes>write(toFilenamePrefix(folder), 
csvFormat).withNumShards(1));
+
+    writePipeline.run().waitUntilFinish();
+
+    
PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + 
"*")))
+        .containsInAnyOrder(
+            expectedHeader,
+            "false,1,10,1.0,1.0,1,1,a,1",
+            "false,2,20,2.0,2.0,2,2,b,2",
+            "false,3,30,3.0,3.0,3,3,c,3");
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void nonNullCSVFormatHeaderWritesSelectedSchemaFields() {
+    File folder =
+        createFolder(
+            AllPrimitiveDataTypes.class.getSimpleName(),
+            "nonNullCSVFormatHeaderWritesSelectedSchemaFields");
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+
+    String expectedHeader = "aFloat,aString,aDecimal";
+
+    CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader("aFloat", "aString", 
"aDecimal");
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), 
csvFormat).withNumShards(1));
+    writePipeline.run().waitUntilFinish();
+
+    
PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + 
"*")))
+        .containsInAnyOrder(expectedHeader, "1.0,a,10", "2.0,b,20", 
"3.0,c,30");
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void withSkipHeaderRecordOnlyWritesRows() {
+    File folder =
+        createFolder(
+            AllPrimitiveDataTypes.class.getSimpleName(),
+            "withSkipHeaderRecordWritesCsvFilesWithoutHeaders");
+
+    CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord();
+
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), 
csvFormat).withNumShards(1));
+
+    writePipeline.run().waitUntilFinish();
+
+    
PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + 
"*")))
+        .containsInAnyOrder(
+            "false,1,10,1.0,1.0,1,1,a,1",
+            "false,2,20,2.0,2.0,2,2,b,2",
+            "false,3,30,3.0,3.0,3,3,c,3");
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void nullCSVFormatHeaderWritesAllSchemaFields() {
+    File folder =
+        createFolder(
+            AllPrimitiveDataTypes.class.getSimpleName(),
+            "nullCSVFormatHeaderWritesAllSchemaFields");
+
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    String expectedHeader = 
"aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger";
+    CSVFormat csvFormat = CSVFormat.DEFAULT;
+
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), 
csvFormat).withNumShards(1));
+
+    writePipeline.run().waitUntilFinish();
+
+    
PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + 
"*")))
+        .containsInAnyOrder(
+            expectedHeader,
+            "false,1,10,1.0,1.0,1,1,a,1",
+            "false,2,20,2.0,2.0,2,2,b,2",
+            "false,3,30,3.0,3.0,3,3,c,3");
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void nonNullHeaderCommentsRequiresHeaderMarker() {
+    PCollection<Row> input =
+        errorPipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    IllegalArgumentException nullHeaderMarker =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                input.apply(
+                    CsvIO.writeRows(
+                        "somewhere",
+                        CSVFormat.DEFAULT.withHeaderComments("some", "header", 
"comments"))));
+
+    assertEquals(
+        "CSVFormat withCommentMarker required when withHeaderComments",
+        nullHeaderMarker.getMessage());
+  }
+
+  private static String toFilenamePrefix(File folder) {
+    checkArgument(folder.isDirectory());
+    return folder.getAbsolutePath() + "/out";
+  }
+
+  private File createFolder(String... paths) {
+    try {
+      return tempFolder.newFolder(paths);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java
new file mode 100644
index 00000000000..645710268eb
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.TIME_CONTAINING_SCHEMA;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link org.apache.commons.csv.CSVFormat} settings in the context of 
{@link
+ * CsvRowConversions.RowToCsv}.
+ */
+@RunWith(JUnit4.class)
+public class RowToCsvCSVFormatTest {
+  @Test
+  public void invalidCSVFormatHeader() {
+    NullPointerException nullHeaderError =
+        assertThrows(
+            NullPointerException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT)
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+    assertEquals("CSVFormat withHeader is required", 
nullHeaderError.getMessage());
+
+    IllegalArgumentException emptyHeaderError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader())
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "CSVFormat withHeader requires at least one column", 
emptyHeaderError.getMessage());
+
+    IllegalArgumentException mismatchHeaderError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(
+                        CSVFormat.DEFAULT.withHeader("aString", "idontexist1", 
"idontexist2"))
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "columns in CSVFormat header do not exist in Schema: 
idontexist2,idontexist1",
+        mismatchHeaderError.getMessage());
+  }
+
+  @Test
+  public void invalidSchema() {
+    IllegalArgumentException emptySchemaError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader())
+                    .setSchema(Schema.of())
+                    .build());
+
+    assertEquals("Schema has no fields", emptySchemaError.getMessage());
+
+    IllegalArgumentException invalidArrayFieldsError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant", 
"instantList"))
+                    .setSchema(TIME_CONTAINING_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "columns in header match fields in Schema with invalid types: 
instantList. See CsvIO#VALID_FIELD_TYPE_SET for a list of valid field types.",
+        invalidArrayFieldsError.getMessage());
+
+    // Should not throw Exception when limited to valid fields.
+    CsvRowConversions.RowToCsv.builder()
+        .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant"))
+        .setSchema(TIME_CONTAINING_SCHEMA)
+        .build();
+  }
+
+  @Test
+  public void withAllowDuplicateHeaderNamesDuplicatesRowFieldOutput() {
+    assertEquals(
+        "allowDuplicateHeaderNames=true",
+        "a,a,a",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withAllowDuplicateHeaderNames(true)
+                .withHeader("aString", "aString", "aString")));
+  }
+
+  @Test
+  public void withAllowMissingColumnNamesSettingThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            "allowMissingColumnNames=true",
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAllowMissingColumnNames(true)));
+
+    assertEquals(
+        "withAllowMissingColumnNames is an illegal CSVFormat setting", 
exception.getMessage());
+  }
+
+  @Test
+  public void withAutoFlushThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            "autoFlush=true",
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAutoFlush(true)));
+
+    assertEquals("withAutoFlush is an illegal CSVFormat setting", 
exception.getMessage());
+  }
+
+  @Test
+  public void withCommentMarkerDoesNotEffectConversion() {
+    Schema schema = Schema.of(Field.of("aString", FieldType.STRING));
+    Row row = Row.withSchema(schema).attachValues("$abc");
+    assertEquals("$abc", rowToCsv(row, 
csvFormat(schema).withCommentMarker('$')));
+    assertEquals("$abc", rowToCsv(row, 
csvFormat(schema).withCommentMarker(null)));
+  }
+
+  @Test
+  public void withDelimiterDrivesCellBorders() {
+    assertEquals(
+        "false~1~10~1.0~1.0~1~1~a~1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withDelimiter('~')));
+  }
+
+  @Test
+  public void withEscapeDrivesOutput() {
+    Schema schema =
+        Schema.of(Field.of("aString", FieldType.STRING), Field.of("anInt", 
FieldType.INT32));
+    Row row = Row.withSchema(schema).attachValues(",a", 1);
+    String[] header = new String[] {"anInt", "aString"};
+    assertEquals(
+        "1,#,a",
+        rowToCsv(
+            row,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeader(header)
+                .withEscape('#')
+                .withQuoteMode(QuoteMode.NONE)));
+    assertEquals(
+        "1,\",a\"", rowToCsv(row, 
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withHeader(header)));
+  }
+
+  @Test
+  public void withHeaderDrivesFieldOrderSubsetOutput() {
+    assertEquals(
+        "1,false,a",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeader("anInteger", "aBoolean", "aString")));
+  }
+
+  @Test
+  public void withHeaderCommentsDoesNotEffectConversion() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeaderComments("some", "header", "comments")));
+  }
+
+  @Test
+  public void withIgnoreEmptyLinesDoesNotEffectOutput() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(true)));
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(false)));
+  }
+
+  @Test
+  public void withIgnoreHeaderCaseThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreHeaderCase(true)));
+    assertEquals("withIgnoreHeaderCase is an illegal CSVFormat setting", 
exception.getMessage());
+  }
+
+  @Test
+  public void withIgnoreSurroundingSpacesThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreSurroundingSpaces(true)));
+    assertEquals(
+        "withIgnoreSurroundingSpaces is an illegal CSVFormat setting", 
exception.getMessage());
+  }
+
+  @Test
+  public void withNullStringReplacesNullValues() {
+    assertEquals(
+        "🦄,🦄,🦄,🦄,🦄,🦄",
+        rowToCsv(
+            DATA.nullableTypesRowAllNull,
+            
csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withNullString("🦄")));
+  }
+
+  @Test
+  public void withQuoteDrivesConversion() {
+    assertEquals(
+        "@false@,1,10,1.0,1.0,1,1,@a@,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withQuote('@')
+                .withQuoteMode(QuoteMode.NON_NUMERIC)));
+  }
+
+  @Test
+  public void withQuoteModeDrivesCellBoundaries() {
+    assertEquals(
+        "\"false\",\"1\",\"10\",\"1.0\",\"1.0\",\"1\",\"1\",\"a\",\"1\"",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.ALL)));
+    assertEquals(
+        "\"false\",1,10,1.0,1.0,1,1,\"a\",1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.NON_NUMERIC)));
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.MINIMAL)));
+    assertEquals(
+        ",,,,,",
+        rowToCsv(
+            DATA.nullableTypesRowAllNull,
+            csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withQuoteMode(QuoteMode.ALL_NON_NULL)));
+    assertEquals(
+        "\"true\",,,,\"a\",\"1\"",
+        rowToCsv(
+            DATA.nullableTypesRowSomeNull,
+            csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withQuoteMode(QuoteMode.ALL_NON_NULL)));
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withEscape('#')
+                .withQuoteMode(QuoteMode.NONE)));
+  }
+
+  @Test
+  public void withSystemRecordSeparatorDoesNotEffectOutput() {
+    assertEquals(
+        rowToCsv(DATA.allPrimitiveDataTypesRow, 
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)),
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withSystemRecordSeparator()));
+  }
+
+  @Test
+  public void withTrailingDelimiterAppendsToLineEnd() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1,",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withTrailingDelimiter(true)));
+  }
+
+  @Test
+  public void withTrimRemovesCellPadding() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,\"       a           \",1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRowWithPadding, 
csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)));
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRowWithPadding,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withTrim(true)));
+  }
+
+  private static SerializableFunction<Row, String> rowToCsvFn(Schema schema, 
CSVFormat csvFormat) {
+    return 
CsvRowConversions.RowToCsv.builder().setCSVFormat(csvFormat).setSchema(schema).build();
+  }
+
+  private static String rowToCsv(Row row, CSVFormat csvFormat) {
+    Schema schema = checkNotNull(row.getSchema());
+    return rowToCsvFn(schema, csvFormat).apply(row);
+  }
+
+  private static CSVFormat csvFormat(Schema schema) {
+    return 
CSVFormat.DEFAULT.withHeader(schema.sorted().getFieldNames().toArray(new 
String[0]));
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java
new file mode 100644
index 00000000000..91dcaec322b
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static 
org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.csv.CsvRowConversions.RowToCsv;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link org.apache.beam.sdk.io.csv.CsvRowConversions.RowToCsv} with 
{@link
+ * org.apache.commons.csv.CSVFormat.Predefined} types.
+ */
+@RunWith(JUnit4.class)
+public class RowToCsvPredefinedCSVFormatsTest {
+  @Test
+  public void defaultFormat() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        
defaultFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        ",,,,,",
+        defaultFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true,,,,a,1",
+        defaultFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        defaultFormat(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void excel() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        
excel(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        ",,,,,",
+        
excel(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true,,,,a,1",
+        
excel(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        excel(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void informixUnload() {
+    assertEquals(
+        "false|1|10|1.0|1.0|1|1|a|1",
+        
informixUnload(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        "|||||",
+        informixUnload(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true||||a|1",
+        informixUnload(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        informixUnload(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void informixUnloadCsv() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        
informixUnloadCSV(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        ",,,,,",
+        informixUnloadCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true,,,,a,1",
+        informixUnloadCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        informixUnloadCSV(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void mySql() {
+    assertEquals(
+        "false\t1\t10\t1.0\t1.0\t1\t1\ta\t1",
+        
mySql(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        "\\N\t\\N\t\\N\t\\N\t\\N\t\\N",
+        
mySql(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true\t\\N\t\\N\t\\N\ta\t1",
+        
mySql(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        mySql(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void rfc4180() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        
rfc4180(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        ",,,,,",
+        
rfc4180(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true,,,,a,1",
+        
rfc4180(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        rfc4180(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void oracle() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        
oracle(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        "\\N,\\N,\\N,\\N,\\N,\\N",
+        
oracle(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true,\\N,\\N,\\N,a,1",
+        
oracle(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        oracle(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void postgresqlCSV() {
+    assertEquals(
+        "\"false\",\"1\",\"10\",\"1.0\",\"1.0\",\"1\",\"1\",\"a\",\"1\"",
+        
postgresqlCSV(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        ",,,,,",
+        postgresqlCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "\"true\",,,,\"a\",\"1\"",
+        postgresqlCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "\"1970-01-01T00:00:00.001Z\"",
+        postgresqlCSV(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void postgresqlText() {
+    assertEquals(
+        
"\"false\"\t\"1\"\t\"10\"\t\"1.0\"\t\"1.0\"\t\"1\"\t\"1\"\t\"a\"\t\"1\"",
+        
postgresqlText(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        "\\N\t\\N\t\\N\t\\N\t\\N\t\\N",
+        postgresqlText(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "\"true\"\t\\N\t\\N\t\\N\t\"a\"\t\"1\"",
+        postgresqlText(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+            .apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "\"1970-01-01T00:00:00.001Z\"",
+        postgresqlText(TIME_CONTAINING_SCHEMA, 
"instant").apply(DATA.timeContainingRow));
+  }
+
+  @Test
+  public void tdf() {
+    assertEquals(
+        "false\t1\t10\t1.0\t1.0\t1\t1\ta\t1",
+        
tdf(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow));
+
+    assertEquals(
+        "", 
tdf(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull));
+
+    assertEquals(
+        "true\t\t\t\ta\t1",
+        
tdf(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull));
+
+    assertEquals(
+        "1970-01-01T00:00:00.001Z",
+        tdf(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow));
+  }
+
+  private static RowToCsv defaultFormat(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.DEFAULT, schema, header);
+  }
+
+  private static RowToCsv excel(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.EXCEL.withAllowMissingColumnNames(false), 
schema, header);
+  }
+
+  private static RowToCsv informixUnload(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.INFORMIX_UNLOAD, schema, header);
+  }
+
+  private static RowToCsv informixUnloadCSV(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.INFORMIX_UNLOAD_CSV, schema, header);
+  }
+
+  private static RowToCsv mySql(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.MYSQL, schema, header);
+  }
+
+  private static RowToCsv rfc4180(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.RFC4180, schema, header);
+  }
+
+  private static RowToCsv oracle(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.ORACLE, schema, header);
+  }
+
+  private static RowToCsv postgresqlCSV(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.POSTGRESQL_CSV, schema, header);
+  }
+
+  private static RowToCsv postgresqlText(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.POSTGRESQL_TEXT, schema, header);
+  }
+
+  private static RowToCsv tdf(Schema schema, String... header) {
+    return rowToCsv(CSVFormat.TDF.withIgnoreSurroundingSpaces(false), schema, 
header);
+  }
+
+  private static RowToCsv rowToCsv(CSVFormat csvFormat, Schema schema, 
String... header) {
+    if (header.length == 0) {
+      header = schema.sorted().getFieldNames().toArray(new String[0]);
+    }
+    return 
RowToCsv.builder().setCSVFormat(csvFormat.withHeader(header)).setSchema(schema).build();
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 454ba5cb64b..4455981a3fd 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -184,6 +184,7 @@ include(":sdks:java:io:expansion-service")
 include(":sdks:java:io:file-based-io-tests")
 include(":sdks:java:io:bigquery-io-perf-tests")
 include(":sdks:java:io:cdap")
+include(":sdks:java:io:csv")
 include(":sdks:java:io:fileschematransform")
 include(":sdks:java:io:google-cloud-platform")
 include(":sdks:java:io:google-cloud-platform:expansion-service")

Reply via email to