lukecwik commented on code in PR #24630:
URL: https://github.com/apache/beam/pull/24630#discussion_r1046193793


##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */

Review Comment:
   Ditto for `<pre>{@code ...`



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   Could we resolve the nullness concerns in new code?



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:

Review Comment:
   `<pre>{@code....`



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Write<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * FileIO.Write#withNumShards(int)} for details.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a directory into which all temporary files will be placed. 
See {@link
+     * FileIO.Write#withTempDirectory(String)}.
+     */
+    public Write<T> withTempDirectory(ResourceId value) {
+      return toBuilder().setTempDirectory(value).build();
+    }
+
+    abstract @Nullable String getFilenamePrefix();
+
+    abstract CSVFormat getCSVFormat();
+
+    abstract @Nullable String getPreamble();
+
+    abstract @Nullable Compression getCompression();
+
+    abstract @Nullable Integer getNumShards();
+
+    abstract String getFilenameSuffix();
+
+    abstract @Nullable ResourceId getTempDirectory();
+
+    abstract @Nullable List<String> getSchemaFields();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFilenamePrefix(String value);
+
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract Optional<CSVFormat> getCSVFormat();
+
+      abstract Builder<T> setPreamble(String value);
+
+      abstract Builder<T> setCompression(Compression value);
+
+      abstract Builder<T> setNumShards(Integer value);
+
+      abstract Builder<T> setFilenameSuffix(String value);
+
+      abstract Optional<String> getFilenameSuffix();
+
+      abstract Builder<T> setTempDirectory(ResourceId value);
+
+      abstract Builder<T> setSchemaFields(List<String> value);
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        if (!getCSVFormat().isPresent()) {
+          setCSVFormat(CSVFormat.DEFAULT);
+        }
+
+        if (!getFilenameSuffix().isPresent()) {
+          setFilenameSuffix(DEFAULT_FILENAME_SUFFIX);
+        }
+
+        return autoBuild();
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      if (getFilenamePrefix() != null) {
+        builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix()));
+      }
+      if (getCSVFormat() != null) {
+        builder.add(DisplayData.item("csvFormat", getCSVFormat().toString()));
+      }
+      if (getPreamble() != null) {
+        builder.add(DisplayData.item("preamble", getPreamble()));
+      }
+      if (getCompression() != null) {
+        builder.add(DisplayData.item("compression", getCompression().name()));
+      }
+      if (getNumShards() != null) {
+        builder.add(DisplayData.item("numShards", getNumShards()));
+      }
+
+      builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix()));
+
+      if (getTempDirectory() != null) {
+        builder.add(DisplayData.item("tempDirectory", 
getTempDirectory().getFilename()));
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (!input.hasSchema()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s requires an input Schema. Note that only Row or user 
classes are supported. Consider using TextIO or FileIO directly when writing 
primitive types",
+                Write.class.getName()));
+      }
+
+      Schema schema = input.getSchema();
+      SerializableFunction<T, Row> toRowFn = input.getToRowFunction();
+      PCollection<Row> rows =
+          
input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema);
+      rows.apply("writeRowsToCsv", buildFileIOWrite().via(buildSink(schema)));

Review Comment:
   ```suggestion
         rows.apply("Write Rows To Csv", 
buildFileIOWrite().via(buildSink(schema)));
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Implementation of {@link PayloadSerializer} for a CSV format. */
+public class CsvPayloadSerializer implements PayloadSerializer {
+
+  private final CSVFormat csvFormat;
+  private final List<String> schemaFields;
+
+  /**
+   * Instantiates a {@link PayloadSerializer} for a CSV format. {@param 
csvFormat} defaults to
+   * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link 
Schema#sorted()} {@link
+   * Schema#getFieldNames()}.
+   */
+  CsvPayloadSerializer(
+      Schema schema, @Nullable CSVFormat csvFormat, @Nullable List<String> 
schemaFields) {
+    CsvUtils.validateSchema(schema);
+    CsvUtils.validateHeaderAgainstSchema(schemaFields, schema);
+    if (csvFormat == null) {
+      csvFormat = CSVFormat.DEFAULT;
+    }
+    this.csvFormat = csvFormat;
+
+    if (schemaFields == null) {
+      schemaFields = schema.sorted().getFieldNames();
+    }
+    this.schemaFields = schemaFields;
+  }
+
+  CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
+
+  List<String> getSchemaFields() {
+    return schemaFields;
+  }
+
+  /**
+   * Serializes a {@link Row} to a CSV byte[] record using a {@link 
CSVFormat}. The schemaFields
+   * provided or generated by the constructor drive the subset and order of 
serialized {@link Row}
+   * fields.
+   */
+  @Override
+  public byte[] serialize(Row row) {
+    StringBuilder builder = new StringBuilder();
+    try {
+      boolean newRecord = true;
+      for (String name : schemaFields) {
+        Object value = row.getValue(name);
+        if (value == null) {

Review Comment:
   I would have expected that the CSVFormat converted nulls based upon 
https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Builder.html#setNullString-java.lang.String-



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Write<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * FileIO.Write#withNumShards(int)} for details.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a directory into which all temporary files will be placed. 
See {@link
+     * FileIO.Write#withTempDirectory(String)}.
+     */
+    public Write<T> withTempDirectory(ResourceId value) {
+      return toBuilder().setTempDirectory(value).build();
+    }
+
+    abstract @Nullable String getFilenamePrefix();
+
+    abstract CSVFormat getCSVFormat();
+
+    abstract @Nullable String getPreamble();
+
+    abstract @Nullable Compression getCompression();
+
+    abstract @Nullable Integer getNumShards();
+
+    abstract String getFilenameSuffix();
+
+    abstract @Nullable ResourceId getTempDirectory();
+
+    abstract @Nullable List<String> getSchemaFields();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFilenamePrefix(String value);
+
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract Optional<CSVFormat> getCSVFormat();
+
+      abstract Builder<T> setPreamble(String value);
+
+      abstract Builder<T> setCompression(Compression value);
+
+      abstract Builder<T> setNumShards(Integer value);
+
+      abstract Builder<T> setFilenameSuffix(String value);
+
+      abstract Optional<String> getFilenameSuffix();
+
+      abstract Builder<T> setTempDirectory(ResourceId value);
+
+      abstract Builder<T> setSchemaFields(List<String> value);
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        if (!getCSVFormat().isPresent()) {
+          setCSVFormat(CSVFormat.DEFAULT);
+        }
+
+        if (!getFilenameSuffix().isPresent()) {
+          setFilenameSuffix(DEFAULT_FILENAME_SUFFIX);

Review Comment:
   Ditto for filename suffix to always start by setting our default and then 
allowing users to override it if they want.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Implementation of {@link PayloadSerializer} for a CSV format. */
+public class CsvPayloadSerializer implements PayloadSerializer {
+
+  private final CSVFormat csvFormat;
+  private final List<String> schemaFields;
+
+  /**
+   * Instantiates a {@link PayloadSerializer} for a CSV format. {@param 
csvFormat} defaults to
+   * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link 
Schema#sorted()} {@link
+   * Schema#getFieldNames()}.
+   */
+  CsvPayloadSerializer(
+      Schema schema, @Nullable CSVFormat csvFormat, @Nullable List<String> 
schemaFields) {
+    CsvUtils.validateSchema(schema);
+    CsvUtils.validateHeaderAgainstSchema(schemaFields, schema);
+    if (csvFormat == null) {
+      csvFormat = CSVFormat.DEFAULT;
+    }
+    this.csvFormat = csvFormat;
+
+    if (schemaFields == null) {
+      schemaFields = schema.sorted().getFieldNames();
+    }
+    this.schemaFields = schemaFields;
+  }
+
+  CSVFormat getCsvFormat() {
+    return csvFormat;
+  }
+
+  List<String> getSchemaFields() {
+    return schemaFields;
+  }
+
+  /**
+   * Serializes a {@link Row} to a CSV byte[] record using a {@link 
CSVFormat}. The schemaFields
+   * provided or generated by the constructor drive the subset and order of 
serialized {@link Row}
+   * fields.
+   */
+  @Override
+  public byte[] serialize(Row row) {
+    StringBuilder builder = new StringBuilder();
+    try {
+      boolean newRecord = true;
+      for (String name : schemaFields) {
+        Object value = row.getValue(name);
+        if (value == null) {
+          value = "";
+        }
+        csvFormat.print(value, builder, newRecord);
+        newRecord = false;
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+    return builder.toString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Converts {@param bytes} to a {@link Row} based on the {@link CSVFormat} 
and schemaFields
+   * parameter in the constructor. Not yet implemented. See
+   * https://github.com/apache/beam/issues/24552.

Review Comment:
   use `<a href=` tag



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Utilities to convert between CSV records, Beam rows, and user types. */
+public final class CsvUtils {
+
+  public static final Set<FieldType> VALID_FIELD_TYPE_SET =

Review Comment:
   We'll want to describe in CsvIO header the field types that are supported 
when converting to/from schema.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6

Review Comment:
   consider using `<pre>{@code ...`
   
   See 
https://github.com/robertwb/incubator-beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L93
 for an example when formatting this.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6

Review Comment:
   Ditto for `<pre>{@code ...`



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Write<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * FileIO.Write#withNumShards(int)} for details.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a directory into which all temporary files will be placed. 
See {@link
+     * FileIO.Write#withTempDirectory(String)}.
+     */
+    public Write<T> withTempDirectory(ResourceId value) {
+      return toBuilder().setTempDirectory(value).build();
+    }
+
+    abstract @Nullable String getFilenamePrefix();
+
+    abstract CSVFormat getCSVFormat();
+
+    abstract @Nullable String getPreamble();
+
+    abstract @Nullable Compression getCompression();
+
+    abstract @Nullable Integer getNumShards();
+
+    abstract String getFilenameSuffix();
+
+    abstract @Nullable ResourceId getTempDirectory();
+
+    abstract @Nullable List<String> getSchemaFields();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFilenamePrefix(String value);
+
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract Optional<CSVFormat> getCSVFormat();
+
+      abstract Builder<T> setPreamble(String value);
+
+      abstract Builder<T> setCompression(Compression value);
+
+      abstract Builder<T> setNumShards(Integer value);
+
+      abstract Builder<T> setFilenameSuffix(String value);
+
+      abstract Optional<String> getFilenameSuffix();
+
+      abstract Builder<T> setTempDirectory(ResourceId value);
+
+      abstract Builder<T> setSchemaFields(List<String> value);
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        if (!getCSVFormat().isPresent()) {
+          setCSVFormat(CSVFormat.DEFAULT);
+        }
+
+        if (!getFilenameSuffix().isPresent()) {
+          setFilenameSuffix(DEFAULT_FILENAME_SUFFIX);
+        }
+
+        return autoBuild();
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      if (getFilenamePrefix() != null) {
+        builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix()));
+      }
+      if (getCSVFormat() != null) {
+        builder.add(DisplayData.item("csvFormat", getCSVFormat().toString()));
+      }
+      if (getPreamble() != null) {
+        builder.add(DisplayData.item("preamble", getPreamble()));
+      }
+      if (getCompression() != null) {
+        builder.add(DisplayData.item("compression", getCompression().name()));
+      }
+      if (getNumShards() != null) {
+        builder.add(DisplayData.item("numShards", getNumShards()));
+      }
+
+      builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix()));
+
+      if (getTempDirectory() != null) {
+        builder.add(DisplayData.item("tempDirectory", 
getTempDirectory().getFilename()));
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (!input.hasSchema()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s requires an input Schema. Note that only Row or user 
classes are supported. Consider using TextIO or FileIO directly when writing 
primitive types",
+                Write.class.getName()));
+      }
+
+      Schema schema = input.getSchema();
+      SerializableFunction<T, Row> toRowFn = input.getToRowFunction();
+      PCollection<Row> rows =
+          
input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema);

Review Comment:
   It is always good to give names for each PTransform that is applied so that 
it is easier to understand what is occurring.
   
   ```suggestion
             input.apply("To Rows", 
MapElements.into(rows()).via(toRowFn)).setRowSchema(schema);
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();

Review Comment:
   Use SerializableFunction or ProcessFunction depending on whether you want to 
allow the user to throw an exception or not.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Write<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * FileIO.Write#withNumShards(int)} for details.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a directory into which all temporary files will be placed. 
See {@link
+     * FileIO.Write#withTempDirectory(String)}.
+     */
+    public Write<T> withTempDirectory(ResourceId value) {
+      return toBuilder().setTempDirectory(value).build();
+    }
+
+    abstract @Nullable String getFilenamePrefix();
+
+    abstract CSVFormat getCSVFormat();
+
+    abstract @Nullable String getPreamble();
+
+    abstract @Nullable Compression getCompression();
+
+    abstract @Nullable Integer getNumShards();
+
+    abstract String getFilenameSuffix();
+
+    abstract @Nullable ResourceId getTempDirectory();
+
+    abstract @Nullable List<String> getSchemaFields();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFilenamePrefix(String value);
+
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract Optional<CSVFormat> getCSVFormat();
+
+      abstract Builder<T> setPreamble(String value);
+
+      abstract Builder<T> setCompression(Compression value);
+
+      abstract Builder<T> setNumShards(Integer value);
+
+      abstract Builder<T> setFilenameSuffix(String value);
+
+      abstract Optional<String> getFilenameSuffix();
+
+      abstract Builder<T> setTempDirectory(ResourceId value);
+
+      abstract Builder<T> setSchemaFields(List<String> value);
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        if (!getCSVFormat().isPresent()) {
+          setCSVFormat(CSVFormat.DEFAULT);

Review Comment:
   Do you differentiate between the user setting CSVFormat.DEFAULT and us 
setting it?
   
   If not, consider dropping Optional and ensuring that we set this everywhere 
when constructing the builder.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Implementation of {@link PayloadSerializer} for a CSV format. */
+public class CsvPayloadSerializer implements PayloadSerializer {
+
+  private final CSVFormat csvFormat;
+  private final List<String> schemaFields;
+
+  /**
+   * Instantiates a {@link PayloadSerializer} for a CSV format. {@param 
csvFormat} defaults to
+   * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link 
Schema#sorted()} {@link
+   * Schema#getFieldNames()}.
+   */
+  CsvPayloadSerializer(

Review Comment:
   It is easier to reason about this if you create constructors with only 
non-nullable fields and have the default operation being done OR have the 
caller own Schema->fields conversion and passing in CSVFormat#DEFAULT
   
   Note, will CSVFormat ever actually be null?



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */
+@SuppressWarnings({
+  "unused",
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CsvIO {
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in CSV format. */
+  public static <T> Write<T> write() {
+    return new AutoValue_CsvIO_Write.Builder<T>().build();
+  }
+
+  /** Instantiates a {@link Write} for {@link Row}s in CSV format. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_CsvIO_Write.Builder<Row>().build();
+  }
+
+  /** Implementation of {@link FileIO.Sink}. */
+  @AutoValue
+  public abstract static class Sink<T> implements FileIO.Sink<T> {
+
+    public static <T> Builder<T> builder() {
+      return new AutoValue_CsvIO_Sink.Builder<>();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Sink<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    private transient @Nullable PrintWriter writer;
+
+    /**
+     * Opens a {@link WritableByteChannel} for writing CSV files. Writes the 
{@link #getPreamble()}
+     * if available followed by the {@link #getHeader()}.
+     */
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer =
+          new PrintWriter(
+              new BufferedWriter(new 
OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
+      if (getPreamble() != null) {
+        writer.println(getPreamble());
+      }
+      writer.println(getHeader());
+    }
+
+    /** Serializes and writes the {@param element} to a file. */
+    @Override
+    public void write(T element) throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      String line = getFormatFunction().apply(element);
+      writer.println(line);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (writer == null) {
+        throw new IllegalStateException(
+            String.format("%s writer is null", PrintWriter.class.getName()));
+      }
+      writer.flush();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    abstract @Nullable String getPreamble();
+
+    /**
+     * The column names of the CSV file written at the top line of each shard 
after the preamble, if
+     * available.
+     */
+    abstract String getHeader();
+
+    /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+    abstract SimpleFunction<T, String> getFormatFunction();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+       * file prior to the header. In the example below, all the text 
proceeding the header
+       * 'column1,column2,column3' is the preamble.
+       *
+       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+       * John Doe
+       *
+       * <p>column1,column2,column3 1,2,3 4,5,6
+       */
+      abstract Builder<T> setPreamble(String value);
+
+      /**
+       * The column names of the CSV file written at the top line of each 
shard after the preamble,
+       * if available.
+       */
+      abstract Builder<T> setHeader(String value);
+
+      /** A {@link SimpleFunction} for converting a {@param T} to a CSV 
formatted string. */
+      abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value);
+
+      abstract Sink<T> build();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone>
+      implements HasDisplayData {
+
+    /** Specifies a common prefix for all generated files. */
+    public Write<T> to(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /** The {@link CSVFormat} of the destination CSV file data. */
+    public Write<T> withCSVFormat(CSVFormat format) {
+      return toBuilder().setCSVFormat(format).build();
+    }
+
+    /**
+     * Not to be confused with the CSV header, it is content written to the 
top of every sharded
+     * file prior to the header. In the example below, all the text proceeding 
the header
+     * 'column1,column2,column3' is the preamble.
+     *
+     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 
2022-12-05 Operator:
+     * John Doe
+     *
+     * <p>column1,column2,column3 1,2,3 4,5,6
+     */
+    public Write<T> withPreamble(String preamble) {
+      return toBuilder().setPreamble(preamble).build();
+    }
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * FileIO.Write#withNumShards(int)} for details.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a directory into which all temporary files will be placed. 
See {@link
+     * FileIO.Write#withTempDirectory(String)}.
+     */
+    public Write<T> withTempDirectory(ResourceId value) {
+      return toBuilder().setTempDirectory(value).build();
+    }
+
+    abstract @Nullable String getFilenamePrefix();
+
+    abstract CSVFormat getCSVFormat();
+
+    abstract @Nullable String getPreamble();
+
+    abstract @Nullable Compression getCompression();
+
+    abstract @Nullable Integer getNumShards();
+
+    abstract String getFilenameSuffix();
+
+    abstract @Nullable ResourceId getTempDirectory();
+
+    abstract @Nullable List<String> getSchemaFields();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFilenamePrefix(String value);
+
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract Optional<CSVFormat> getCSVFormat();
+
+      abstract Builder<T> setPreamble(String value);
+
+      abstract Builder<T> setCompression(Compression value);
+
+      abstract Builder<T> setNumShards(Integer value);
+
+      abstract Builder<T> setFilenameSuffix(String value);
+
+      abstract Optional<String> getFilenameSuffix();
+
+      abstract Builder<T> setTempDirectory(ResourceId value);
+
+      abstract Builder<T> setSchemaFields(List<String> value);
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        if (!getCSVFormat().isPresent()) {
+          setCSVFormat(CSVFormat.DEFAULT);
+        }
+
+        if (!getFilenameSuffix().isPresent()) {
+          setFilenameSuffix(DEFAULT_FILENAME_SUFFIX);
+        }
+
+        return autoBuild();
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      if (getFilenamePrefix() != null) {
+        builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix()));
+      }
+      if (getCSVFormat() != null) {
+        builder.add(DisplayData.item("csvFormat", getCSVFormat().toString()));
+      }
+      if (getPreamble() != null) {
+        builder.add(DisplayData.item("preamble", getPreamble()));
+      }
+      if (getCompression() != null) {
+        builder.add(DisplayData.item("compression", getCompression().name()));
+      }
+      if (getNumShards() != null) {
+        builder.add(DisplayData.item("numShards", getNumShards()));
+      }
+
+      builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix()));
+
+      if (getTempDirectory() != null) {
+        builder.add(DisplayData.item("tempDirectory", 
getTempDirectory().getFilename()));
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (!input.hasSchema()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s requires an input Schema. Note that only Row or user 
classes are supported. Consider using TextIO or FileIO directly when writing 
primitive types",
+                Write.class.getName()));
+      }
+
+      Schema schema = input.getSchema();
+      SerializableFunction<T, Row> toRowFn = input.getToRowFunction();
+      PCollection<Row> rows =
+          
input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema);
+      rows.apply("writeRowsToCsv", buildFileIOWrite().via(buildSink(schema)));
+      return PDone.in(input.getPipeline());
+    }
+
+    /**
+     * Builds a header using {@link CSVFormat} based on either a {@link 
Schema#sorted()} {@link
+     * Schema#getFieldNames()} if {@link #getSchemaFields()} is null or {@link 
#getSchemaFields()}.
+     */
+    String buildHeader(Schema schema) {
+      if (getSchemaFields() != null) {
+        return CsvUtils.buildHeaderFrom(getSchemaFields(), getCSVFormat());
+      }
+      return CsvUtils.buildHeaderFrom(schema.sorted(), getCSVFormat());
+    }
+
+    /** Builds a {@link Sink} for writing {@link Row} serialized using {@link 
CSVFormat}. */
+    Sink<Row> buildSink(Schema schema) {
+      List<String> schemaFields = null;
+      String header = null;
+      if (getSchemaFields() != null) {
+        schemaFields = getSchemaFields();
+      }
+      return Sink.<Row>builder()
+          .setPreamble(getPreamble())
+          .setHeader(buildHeader(schema))
+          .setFormatFunction(
+              CsvUtils.getRowToCsvStringFunction(schema, getCSVFormat(), 
schemaFields))
+          .build();
+    }
+
+    /** Builds a {@link FileIO.Write} with a {@link Sink}. */
+    FileIO.Write<Void, Row> buildFileIOWrite() {
+      checkArgument(getFilenamePrefix() != null, "to() is required");
+
+      ResourceId prefix =
+          FileSystems.matchNewResource(getFilenamePrefix(), false /* 
isDirectory */);
+
+      FileIO.Write<Void, Row> write =
+          FileIO.<Row>write()
+              .to(prefix.getCurrentDirectory().toString())
+              .withPrefix(Objects.requireNonNull(prefix.getFilename()));
+
+      if (getCompression() != null) {
+        write = write.withCompression(getCompression());
+      }
+
+      if (getNumShards() != null) {
+        write = write.withNumShards(getNumShards());
+      }
+
+      if (getFilenameSuffix() != null) {
+        write = write.withSuffix(getFilenameSuffix());
+      }
+
+      if (getTempDirectory() != null) {
+        write = 
write.withTempDirectory(Objects.requireNonNull(getTempDirectory().getFilename()));
+      }
+
+      return write;
+    }

Review Comment:
   nit: Consider inlining these into expand since these are small methods and 
we don't expect them to be used outside of expand()



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Utilities to convert between CSV records, Beam rows, and user types. */
+public final class CsvUtils {

Review Comment:
   Do we want this to be public?



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Utilities to convert between CSV records, Beam rows, and user types. */
+public final class CsvUtils {
+
+  public static final Set<FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  /**
+   * Planned for: TODO(https://github.com/apache/beam/issues/24552) Returns a 
{@link SimpleFunction}
+   * that converts a CSV byte[] record to a Beam {@link Row}. Providing 
{@param schemaFields}
+   * determines the subset and order of {@param beamSchema} fields expected 
from the CSV byte[]
+   * record. Otherwise, the expected order derives from the {@link 
Schema#sorted()} field order.
+   */
+  public static SimpleFunction<byte[], Row> getCsvBytesToRowFunction(
+      Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> 
schemaFields) {
+    CsvPayloadSerializer payloadSerializer =
+        new CsvPayloadSerializer(beamSchema, format, schemaFields);
+    return new CsvBytesToRowFn(payloadSerializer);
+  }
+
+  /**
+   * Planned for: TODO(https://github.com/apache/beam/issues/24552) Returns a 
{@link SimpleFunction}
+   * that converts a CSV String record to a Beam {@link Row}. Providing 
{@param schemaFields}
+   * determines the subset and order of {@param beamSchema} fields expected 
from the CSV String
+   * record. Otherwise, the expected order derives from the {@link 
Schema#sorted()} field order.
+   */
+  public static SimpleFunction<String, Row> getCsvStringToRowFunction(
+      Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> 
schemaFields) {
+    CsvPayloadSerializer payloadSerializer =
+        new CsvPayloadSerializer(beamSchema, format, schemaFields);
+    return new CsvStringToRowFn(payloadSerializer);
+  }
+
+  /**
+   * Returns a {@link SimpleFunction} that converts a {@link Row} to a CSV 
byte[] record. Providing
+   * {@param schemaFields} determines the subset and order of {@param 
beamSchema} fields of the
+   * resulting CSV byte[] record. Otherwise, the order derives from the {@link 
Schema#sorted()}
+   * field order.
+   */
+  public static SimpleFunction<Row, byte[]> getRowToCsvBytesFunction(
+      Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> 
schemaFields) {
+    CsvPayloadSerializer payloadSerializer =
+        new CsvPayloadSerializer(beamSchema, format, schemaFields);
+    return new RowToCsvBytesFn(payloadSerializer);
+  }
+
+  /**
+   * Returns a {@link SimpleFunction} that converts a {@link Row} to a CSV 
String record. Providing
+   * {@param schemaFields} determines the subset and order of {@param 
beamSchema} fields of the
+   * resulting CSV String record. Otherwise, the order derives from the {@link 
Schema#sorted()}
+   * field order.
+   */
+  public static SimpleFunction<Row, String> getRowToCsvStringFunction(
+      Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> 
schemaFields) {
+
+    CsvPayloadSerializer payloadSerializer =
+        new CsvPayloadSerializer(beamSchema, format, schemaFields);
+    return new RowToCsvStringFn(payloadSerializer);
+  }
+
+  /** Formats {@param columns} into a header String based on {@link 
CSVFormat}. */
+  static String buildHeaderFrom(List<String> columns, CSVFormat csvFormat) {
+    StringBuilder builder = new StringBuilder();
+    try {
+      boolean newRecord = true;
+      for (String name : columns) {
+        csvFormat.print(name, builder, newRecord);
+        newRecord = false;
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Formats {@link Schema#sorted()} list of fields into a header String based 
on {@link CSVFormat}.

Review Comment:
   ```suggestion
      * Formats {@link Schema#sorted()} list of fields into a header based upon 
{@link CSVFormat}.
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing Csv files. */

Review Comment:
   ```suggestion
   /** Transforms for reading and writing CSV files. */
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Transforms for reading and writing CSV files. */

Review Comment:
   Please add:
   * sample snippets
   * call out supported/unsupported schema conversions
   * any other notable limitations



##########
sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvFormatLogicalTypeTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVFormat.Predefined;
+import org.junit.Test;
+
+/** Tests for {@link CsvFormatLogicalType}. */
+public class CsvFormatLogicalTypeTest {

Review Comment:
   ```suggestion
   @RunWith(JUnit4.class)
   public class CsvFormatLogicalTypeTest {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to