lukecwik commented on code in PR #24630: URL: https://github.com/apache/beam/pull/24630#discussion_r1046193793
########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ Review Comment: Ditto for `<pre>{@code ...` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) Review Comment: Could we resolve the nullness concerns in new code? ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: Review Comment: `<pre>{@code....` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Write<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * FileIO.Write#withNumShards(int)} for details. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setNumShards(numShards).build(); + } + + /** + * Specifies a directory into which all temporary files will be placed. See {@link + * FileIO.Write#withTempDirectory(String)}. + */ + public Write<T> withTempDirectory(ResourceId value) { + return toBuilder().setTempDirectory(value).build(); + } + + abstract @Nullable String getFilenamePrefix(); + + abstract CSVFormat getCSVFormat(); + + abstract @Nullable String getPreamble(); + + abstract @Nullable Compression getCompression(); + + abstract @Nullable Integer getNumShards(); + + abstract String getFilenameSuffix(); + + abstract @Nullable ResourceId getTempDirectory(); + + abstract @Nullable List<String> getSchemaFields(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setFilenamePrefix(String value); + + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract Optional<CSVFormat> getCSVFormat(); + + abstract Builder<T> setPreamble(String value); + + abstract Builder<T> setCompression(Compression value); + + abstract Builder<T> setNumShards(Integer value); + + abstract Builder<T> setFilenameSuffix(String value); + + abstract Optional<String> getFilenameSuffix(); + + abstract Builder<T> setTempDirectory(ResourceId value); + + abstract Builder<T> setSchemaFields(List<String> value); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + if (!getCSVFormat().isPresent()) { + setCSVFormat(CSVFormat.DEFAULT); + } + + if (!getFilenameSuffix().isPresent()) { + setFilenameSuffix(DEFAULT_FILENAME_SUFFIX); + } + + return autoBuild(); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (getFilenamePrefix() != null) { + builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix())); + } + if (getCSVFormat() != null) { + builder.add(DisplayData.item("csvFormat", getCSVFormat().toString())); + } + if (getPreamble() != null) { + builder.add(DisplayData.item("preamble", getPreamble())); + } + if (getCompression() != null) { + builder.add(DisplayData.item("compression", getCompression().name())); + } + if (getNumShards() != null) { + builder.add(DisplayData.item("numShards", getNumShards())); + } + + builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix())); + + if (getTempDirectory() != null) { + builder.add(DisplayData.item("tempDirectory", getTempDirectory().getFilename())); + } + } + + @Override + public PDone expand(PCollection<T> input) { + if (!input.hasSchema()) { + throw new IllegalArgumentException( + String.format( + "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types", + Write.class.getName())); + } + + Schema schema = input.getSchema(); + SerializableFunction<T, Row> toRowFn = input.getToRowFunction(); + PCollection<Row> rows = + input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema); + rows.apply("writeRowsToCsv", buildFileIOWrite().via(buildSink(schema))); Review Comment: ```suggestion rows.apply("Write Rows To Csv", buildFileIOWrite().via(buildSink(schema))); ``` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Implementation of {@link PayloadSerializer} for a CSV format. */ +public class CsvPayloadSerializer implements PayloadSerializer { + + private final CSVFormat csvFormat; + private final List<String> schemaFields; + + /** + * Instantiates a {@link PayloadSerializer} for a CSV format. {@param csvFormat} defaults to + * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link Schema#sorted()} {@link + * Schema#getFieldNames()}. + */ + CsvPayloadSerializer( + Schema schema, @Nullable CSVFormat csvFormat, @Nullable List<String> schemaFields) { + CsvUtils.validateSchema(schema); + CsvUtils.validateHeaderAgainstSchema(schemaFields, schema); + if (csvFormat == null) { + csvFormat = CSVFormat.DEFAULT; + } + this.csvFormat = csvFormat; + + if (schemaFields == null) { + schemaFields = schema.sorted().getFieldNames(); + } + this.schemaFields = schemaFields; + } + + CSVFormat getCsvFormat() { + return csvFormat; + } + + List<String> getSchemaFields() { + return schemaFields; + } + + /** + * Serializes a {@link Row} to a CSV byte[] record using a {@link CSVFormat}. The schemaFields + * provided or generated by the constructor drive the subset and order of serialized {@link Row} + * fields. + */ + @Override + public byte[] serialize(Row row) { + StringBuilder builder = new StringBuilder(); + try { + boolean newRecord = true; + for (String name : schemaFields) { + Object value = row.getValue(name); + if (value == null) { Review Comment: I would have expected that the CSVFormat converted nulls based upon https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Builder.html#setNullString-java.lang.String- ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Write<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * FileIO.Write#withNumShards(int)} for details. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setNumShards(numShards).build(); + } + + /** + * Specifies a directory into which all temporary files will be placed. See {@link + * FileIO.Write#withTempDirectory(String)}. + */ + public Write<T> withTempDirectory(ResourceId value) { + return toBuilder().setTempDirectory(value).build(); + } + + abstract @Nullable String getFilenamePrefix(); + + abstract CSVFormat getCSVFormat(); + + abstract @Nullable String getPreamble(); + + abstract @Nullable Compression getCompression(); + + abstract @Nullable Integer getNumShards(); + + abstract String getFilenameSuffix(); + + abstract @Nullable ResourceId getTempDirectory(); + + abstract @Nullable List<String> getSchemaFields(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setFilenamePrefix(String value); + + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract Optional<CSVFormat> getCSVFormat(); + + abstract Builder<T> setPreamble(String value); + + abstract Builder<T> setCompression(Compression value); + + abstract Builder<T> setNumShards(Integer value); + + abstract Builder<T> setFilenameSuffix(String value); + + abstract Optional<String> getFilenameSuffix(); + + abstract Builder<T> setTempDirectory(ResourceId value); + + abstract Builder<T> setSchemaFields(List<String> value); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + if (!getCSVFormat().isPresent()) { + setCSVFormat(CSVFormat.DEFAULT); + } + + if (!getFilenameSuffix().isPresent()) { + setFilenameSuffix(DEFAULT_FILENAME_SUFFIX); Review Comment: Ditto for filename suffix to always start by setting our default and then allowing users to override it if they want. ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Implementation of {@link PayloadSerializer} for a CSV format. */ +public class CsvPayloadSerializer implements PayloadSerializer { + + private final CSVFormat csvFormat; + private final List<String> schemaFields; + + /** + * Instantiates a {@link PayloadSerializer} for a CSV format. {@param csvFormat} defaults to + * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link Schema#sorted()} {@link + * Schema#getFieldNames()}. + */ + CsvPayloadSerializer( + Schema schema, @Nullable CSVFormat csvFormat, @Nullable List<String> schemaFields) { + CsvUtils.validateSchema(schema); + CsvUtils.validateHeaderAgainstSchema(schemaFields, schema); + if (csvFormat == null) { + csvFormat = CSVFormat.DEFAULT; + } + this.csvFormat = csvFormat; + + if (schemaFields == null) { + schemaFields = schema.sorted().getFieldNames(); + } + this.schemaFields = schemaFields; + } + + CSVFormat getCsvFormat() { + return csvFormat; + } + + List<String> getSchemaFields() { + return schemaFields; + } + + /** + * Serializes a {@link Row} to a CSV byte[] record using a {@link CSVFormat}. The schemaFields + * provided or generated by the constructor drive the subset and order of serialized {@link Row} + * fields. + */ + @Override + public byte[] serialize(Row row) { + StringBuilder builder = new StringBuilder(); + try { + boolean newRecord = true; + for (String name : schemaFields) { + Object value = row.getValue(name); + if (value == null) { + value = ""; + } + csvFormat.print(value, builder, newRecord); + newRecord = false; + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + return builder.toString().getBytes(StandardCharsets.UTF_8); + } + + /** + * Converts {@param bytes} to a {@link Row} based on the {@link CSVFormat} and schemaFields + * parameter in the constructor. Not yet implemented. See + * https://github.com/apache/beam/issues/24552. Review Comment: use `<a href=` tag ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Utilities to convert between CSV records, Beam rows, and user types. */ +public final class CsvUtils { + + public static final Set<FieldType> VALID_FIELD_TYPE_SET = Review Comment: We'll want to describe in CsvIO header the field types that are supported when converting to/from schema. ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 Review Comment: consider using `<pre>{@code ...` See https://github.com/robertwb/incubator-beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L93 for an example when formatting this. ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 Review Comment: Ditto for `<pre>{@code ...` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Write<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * FileIO.Write#withNumShards(int)} for details. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setNumShards(numShards).build(); + } + + /** + * Specifies a directory into which all temporary files will be placed. See {@link + * FileIO.Write#withTempDirectory(String)}. + */ + public Write<T> withTempDirectory(ResourceId value) { + return toBuilder().setTempDirectory(value).build(); + } + + abstract @Nullable String getFilenamePrefix(); + + abstract CSVFormat getCSVFormat(); + + abstract @Nullable String getPreamble(); + + abstract @Nullable Compression getCompression(); + + abstract @Nullable Integer getNumShards(); + + abstract String getFilenameSuffix(); + + abstract @Nullable ResourceId getTempDirectory(); + + abstract @Nullable List<String> getSchemaFields(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setFilenamePrefix(String value); + + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract Optional<CSVFormat> getCSVFormat(); + + abstract Builder<T> setPreamble(String value); + + abstract Builder<T> setCompression(Compression value); + + abstract Builder<T> setNumShards(Integer value); + + abstract Builder<T> setFilenameSuffix(String value); + + abstract Optional<String> getFilenameSuffix(); + + abstract Builder<T> setTempDirectory(ResourceId value); + + abstract Builder<T> setSchemaFields(List<String> value); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + if (!getCSVFormat().isPresent()) { + setCSVFormat(CSVFormat.DEFAULT); + } + + if (!getFilenameSuffix().isPresent()) { + setFilenameSuffix(DEFAULT_FILENAME_SUFFIX); + } + + return autoBuild(); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (getFilenamePrefix() != null) { + builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix())); + } + if (getCSVFormat() != null) { + builder.add(DisplayData.item("csvFormat", getCSVFormat().toString())); + } + if (getPreamble() != null) { + builder.add(DisplayData.item("preamble", getPreamble())); + } + if (getCompression() != null) { + builder.add(DisplayData.item("compression", getCompression().name())); + } + if (getNumShards() != null) { + builder.add(DisplayData.item("numShards", getNumShards())); + } + + builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix())); + + if (getTempDirectory() != null) { + builder.add(DisplayData.item("tempDirectory", getTempDirectory().getFilename())); + } + } + + @Override + public PDone expand(PCollection<T> input) { + if (!input.hasSchema()) { + throw new IllegalArgumentException( + String.format( + "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types", + Write.class.getName())); + } + + Schema schema = input.getSchema(); + SerializableFunction<T, Row> toRowFn = input.getToRowFunction(); + PCollection<Row> rows = + input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema); Review Comment: It is always good to give names for each PTransform that is applied so that it is easier to understand what is occurring. ```suggestion input.apply("To Rows", MapElements.into(rows()).via(toRowFn)).setRowSchema(schema); ``` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); Review Comment: Use SerializableFunction or ProcessFunction depending on whether you want to allow the user to throw an exception or not. ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Write<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * FileIO.Write#withNumShards(int)} for details. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setNumShards(numShards).build(); + } + + /** + * Specifies a directory into which all temporary files will be placed. See {@link + * FileIO.Write#withTempDirectory(String)}. + */ + public Write<T> withTempDirectory(ResourceId value) { + return toBuilder().setTempDirectory(value).build(); + } + + abstract @Nullable String getFilenamePrefix(); + + abstract CSVFormat getCSVFormat(); + + abstract @Nullable String getPreamble(); + + abstract @Nullable Compression getCompression(); + + abstract @Nullable Integer getNumShards(); + + abstract String getFilenameSuffix(); + + abstract @Nullable ResourceId getTempDirectory(); + + abstract @Nullable List<String> getSchemaFields(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setFilenamePrefix(String value); + + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract Optional<CSVFormat> getCSVFormat(); + + abstract Builder<T> setPreamble(String value); + + abstract Builder<T> setCompression(Compression value); + + abstract Builder<T> setNumShards(Integer value); + + abstract Builder<T> setFilenameSuffix(String value); + + abstract Optional<String> getFilenameSuffix(); + + abstract Builder<T> setTempDirectory(ResourceId value); + + abstract Builder<T> setSchemaFields(List<String> value); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + if (!getCSVFormat().isPresent()) { + setCSVFormat(CSVFormat.DEFAULT); Review Comment: Do you differentiate between the user setting CSVFormat.DEFAULT and us setting it? If not, consider dropping Optional and ensuring that we set this everywhere when constructing the builder. ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvPayloadSerializer.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Implementation of {@link PayloadSerializer} for a CSV format. */ +public class CsvPayloadSerializer implements PayloadSerializer { + + private final CSVFormat csvFormat; + private final List<String> schemaFields; + + /** + * Instantiates a {@link PayloadSerializer} for a CSV format. {@param csvFormat} defaults to + * {@link CSVFormat#DEFAULT} and {@param schemaFields} defaults to {@link Schema#sorted()} {@link + * Schema#getFieldNames()}. + */ + CsvPayloadSerializer( Review Comment: It is easier to reason about this if you create constructors with only non-nullable fields and have the default operation being done OR have the caller own Schema->fields conversion and passing in CSVFormat#DEFAULT Note, will CSVFormat ever actually be null? ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ +@SuppressWarnings({ + "unused", + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CsvIO { + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in CSV format. */ + public static <T> Write<T> write() { + return new AutoValue_CsvIO_Write.Builder<T>().build(); + } + + /** Instantiates a {@link Write} for {@link Row}s in CSV format. */ + public static Write<Row> writeRows() { + return new AutoValue_CsvIO_Write.Builder<Row>().build(); + } + + /** Implementation of {@link FileIO.Sink}. */ + @AutoValue + public abstract static class Sink<T> implements FileIO.Sink<T> { + + public static <T> Builder<T> builder() { + return new AutoValue_CsvIO_Sink.Builder<>(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Sink<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + private transient @Nullable PrintWriter writer; + + /** + * Opens a {@link WritableByteChannel} for writing CSV files. Writes the {@link #getPreamble()} + * if available followed by the {@link #getHeader()}. + */ + @Override + public void open(WritableByteChannel channel) throws IOException { + writer = + new PrintWriter( + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8))); + if (getPreamble() != null) { + writer.println(getPreamble()); + } + writer.println(getHeader()); + } + + /** Serializes and writes the {@param element} to a file. */ + @Override + public void write(T element) throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + String line = getFormatFunction().apply(element); + writer.println(line); + } + + @Override + public void flush() throws IOException { + if (writer == null) { + throw new IllegalStateException( + String.format("%s writer is null", PrintWriter.class.getName())); + } + writer.flush(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract @Nullable String getPreamble(); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, if + * available. + */ + abstract String getHeader(); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract SimpleFunction<T, String> getFormatFunction(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + abstract Builder<T> setPreamble(String value); + + /** + * The column names of the CSV file written at the top line of each shard after the preamble, + * if available. + */ + abstract Builder<T> setHeader(String value); + + /** A {@link SimpleFunction} for converting a {@param T} to a CSV formatted string. */ + abstract Builder<T> setFormatFunction(SimpleFunction<T, String> value); + + abstract Sink<T> build(); + } + } + + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> + implements HasDisplayData { + + /** Specifies a common prefix for all generated files. */ + public Write<T> to(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** The {@link CSVFormat} of the destination CSV file data. */ + public Write<T> withCSVFormat(CSVFormat format) { + return toBuilder().setCSVFormat(format).build(); + } + + /** + * Not to be confused with the CSV header, it is content written to the top of every sharded + * file prior to the header. In the example below, all the text proceeding the header + * 'column1,column2,column3' is the preamble. + * + * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator: + * John Doe + * + * <p>column1,column2,column3 1,2,3 4,5,6 + */ + public Write<T> withPreamble(String preamble) { + return toBuilder().setPreamble(preamble).build(); + } + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * FileIO.Write#withNumShards(int)} for details. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setNumShards(numShards).build(); + } + + /** + * Specifies a directory into which all temporary files will be placed. See {@link + * FileIO.Write#withTempDirectory(String)}. + */ + public Write<T> withTempDirectory(ResourceId value) { + return toBuilder().setTempDirectory(value).build(); + } + + abstract @Nullable String getFilenamePrefix(); + + abstract CSVFormat getCSVFormat(); + + abstract @Nullable String getPreamble(); + + abstract @Nullable Compression getCompression(); + + abstract @Nullable Integer getNumShards(); + + abstract String getFilenameSuffix(); + + abstract @Nullable ResourceId getTempDirectory(); + + abstract @Nullable List<String> getSchemaFields(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setFilenamePrefix(String value); + + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract Optional<CSVFormat> getCSVFormat(); + + abstract Builder<T> setPreamble(String value); + + abstract Builder<T> setCompression(Compression value); + + abstract Builder<T> setNumShards(Integer value); + + abstract Builder<T> setFilenameSuffix(String value); + + abstract Optional<String> getFilenameSuffix(); + + abstract Builder<T> setTempDirectory(ResourceId value); + + abstract Builder<T> setSchemaFields(List<String> value); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + if (!getCSVFormat().isPresent()) { + setCSVFormat(CSVFormat.DEFAULT); + } + + if (!getFilenameSuffix().isPresent()) { + setFilenameSuffix(DEFAULT_FILENAME_SUFFIX); + } + + return autoBuild(); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (getFilenamePrefix() != null) { + builder.add(DisplayData.item("filenamePrefix", getFilenamePrefix())); + } + if (getCSVFormat() != null) { + builder.add(DisplayData.item("csvFormat", getCSVFormat().toString())); + } + if (getPreamble() != null) { + builder.add(DisplayData.item("preamble", getPreamble())); + } + if (getCompression() != null) { + builder.add(DisplayData.item("compression", getCompression().name())); + } + if (getNumShards() != null) { + builder.add(DisplayData.item("numShards", getNumShards())); + } + + builder.add(DisplayData.item("filenameSuffix", getFilenameSuffix())); + + if (getTempDirectory() != null) { + builder.add(DisplayData.item("tempDirectory", getTempDirectory().getFilename())); + } + } + + @Override + public PDone expand(PCollection<T> input) { + if (!input.hasSchema()) { + throw new IllegalArgumentException( + String.format( + "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types", + Write.class.getName())); + } + + Schema schema = input.getSchema(); + SerializableFunction<T, Row> toRowFn = input.getToRowFunction(); + PCollection<Row> rows = + input.apply(MapElements.into(rows()).via(toRowFn)).setRowSchema(schema); + rows.apply("writeRowsToCsv", buildFileIOWrite().via(buildSink(schema))); + return PDone.in(input.getPipeline()); + } + + /** + * Builds a header using {@link CSVFormat} based on either a {@link Schema#sorted()} {@link + * Schema#getFieldNames()} if {@link #getSchemaFields()} is null or {@link #getSchemaFields()}. + */ + String buildHeader(Schema schema) { + if (getSchemaFields() != null) { + return CsvUtils.buildHeaderFrom(getSchemaFields(), getCSVFormat()); + } + return CsvUtils.buildHeaderFrom(schema.sorted(), getCSVFormat()); + } + + /** Builds a {@link Sink} for writing {@link Row} serialized using {@link CSVFormat}. */ + Sink<Row> buildSink(Schema schema) { + List<String> schemaFields = null; + String header = null; + if (getSchemaFields() != null) { + schemaFields = getSchemaFields(); + } + return Sink.<Row>builder() + .setPreamble(getPreamble()) + .setHeader(buildHeader(schema)) + .setFormatFunction( + CsvUtils.getRowToCsvStringFunction(schema, getCSVFormat(), schemaFields)) + .build(); + } + + /** Builds a {@link FileIO.Write} with a {@link Sink}. */ + FileIO.Write<Void, Row> buildFileIOWrite() { + checkArgument(getFilenamePrefix() != null, "to() is required"); + + ResourceId prefix = + FileSystems.matchNewResource(getFilenamePrefix(), false /* isDirectory */); + + FileIO.Write<Void, Row> write = + FileIO.<Row>write() + .to(prefix.getCurrentDirectory().toString()) + .withPrefix(Objects.requireNonNull(prefix.getFilename())); + + if (getCompression() != null) { + write = write.withCompression(getCompression()); + } + + if (getNumShards() != null) { + write = write.withNumShards(getNumShards()); + } + + if (getFilenameSuffix() != null) { + write = write.withSuffix(getFilenameSuffix()); + } + + if (getTempDirectory() != null) { + write = write.withTempDirectory(Objects.requireNonNull(getTempDirectory().getFilename())); + } + + return write; + } Review Comment: nit: Consider inlining these into expand since these are small methods and we don't expect them to be used outside of expand() ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Utilities to convert between CSV records, Beam rows, and user types. */ +public final class CsvUtils { Review Comment: Do we want this to be public? ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvUtils.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Utilities to convert between CSV records, Beam rows, and user types. */ +public final class CsvUtils { + + public static final Set<FieldType> VALID_FIELD_TYPE_SET = + ImmutableSet.of( + FieldType.BYTE, + FieldType.BOOLEAN, + FieldType.DATETIME, + FieldType.DECIMAL, + FieldType.DOUBLE, + FieldType.INT16, + FieldType.INT32, + FieldType.INT64, + FieldType.FLOAT, + FieldType.STRING); + + /** + * Planned for: TODO(https://github.com/apache/beam/issues/24552) Returns a {@link SimpleFunction} + * that converts a CSV byte[] record to a Beam {@link Row}. Providing {@param schemaFields} + * determines the subset and order of {@param beamSchema} fields expected from the CSV byte[] + * record. Otherwise, the expected order derives from the {@link Schema#sorted()} field order. + */ + public static SimpleFunction<byte[], Row> getCsvBytesToRowFunction( + Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> schemaFields) { + CsvPayloadSerializer payloadSerializer = + new CsvPayloadSerializer(beamSchema, format, schemaFields); + return new CsvBytesToRowFn(payloadSerializer); + } + + /** + * Planned for: TODO(https://github.com/apache/beam/issues/24552) Returns a {@link SimpleFunction} + * that converts a CSV String record to a Beam {@link Row}. Providing {@param schemaFields} + * determines the subset and order of {@param beamSchema} fields expected from the CSV String + * record. Otherwise, the expected order derives from the {@link Schema#sorted()} field order. + */ + public static SimpleFunction<String, Row> getCsvStringToRowFunction( + Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> schemaFields) { + CsvPayloadSerializer payloadSerializer = + new CsvPayloadSerializer(beamSchema, format, schemaFields); + return new CsvStringToRowFn(payloadSerializer); + } + + /** + * Returns a {@link SimpleFunction} that converts a {@link Row} to a CSV byte[] record. Providing + * {@param schemaFields} determines the subset and order of {@param beamSchema} fields of the + * resulting CSV byte[] record. Otherwise, the order derives from the {@link Schema#sorted()} + * field order. + */ + public static SimpleFunction<Row, byte[]> getRowToCsvBytesFunction( + Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> schemaFields) { + CsvPayloadSerializer payloadSerializer = + new CsvPayloadSerializer(beamSchema, format, schemaFields); + return new RowToCsvBytesFn(payloadSerializer); + } + + /** + * Returns a {@link SimpleFunction} that converts a {@link Row} to a CSV String record. Providing + * {@param schemaFields} determines the subset and order of {@param beamSchema} fields of the + * resulting CSV String record. Otherwise, the order derives from the {@link Schema#sorted()} + * field order. + */ + public static SimpleFunction<Row, String> getRowToCsvStringFunction( + Schema beamSchema, @Nullable CSVFormat format, @Nullable List<String> schemaFields) { + + CsvPayloadSerializer payloadSerializer = + new CsvPayloadSerializer(beamSchema, format, schemaFields); + return new RowToCsvStringFn(payloadSerializer); + } + + /** Formats {@param columns} into a header String based on {@link CSVFormat}. */ + static String buildHeaderFrom(List<String> columns, CSVFormat csvFormat) { + StringBuilder builder = new StringBuilder(); + try { + boolean newRecord = true; + for (String name : columns) { + csvFormat.print(name, builder, newRecord); + newRecord = false; + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + return builder.toString(); + } + + /** + * Formats {@link Schema#sorted()} list of fields into a header String based on {@link CSVFormat}. Review Comment: ```suggestion * Formats {@link Schema#sorted()} list of fields into a header based upon {@link CSVFormat}. ``` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java: ########## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing Csv files. */ Review Comment: ```suggestion /** Transforms for reading and writing CSV files. */ ``` ########## sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Transforms for reading and writing CSV files. */ Review Comment: Please add: * sample snippets * call out supported/unsupported schema conversions * any other notable limitations ########## sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvFormatLogicalTypeTest.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVFormat.Predefined; +import org.junit.Test; + +/** Tests for {@link CsvFormatLogicalType}. */ +public class CsvFormatLogicalTypeTest { Review Comment: ```suggestion @RunWith(JUnit4.class) public class CsvFormatLogicalTypeTest { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
