This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 66aa607845b [24469] Implement CsvIO.Write and supporting classes (#24630) 66aa607845b is described below commit 66aa607845b43e4431723e9382755389bbba0412 Author: Damon <damondoug...@users.noreply.github.com> AuthorDate: Fri Jan 27 23:45:59 2023 +0000 [24469] Implement CsvIO.Write and supporting classes (#24630) This PR closes #24469 and addresses #20312 by implementing CsvIO.Write and supporting classes. Please see #24469 for more details. --- sdks/java/io/csv/build.gradle | 34 ++ .../java/org/apache/beam/sdk/io/csv/CsvIO.java | 458 +++++++++++++++++++++ .../apache/beam/sdk/io/csv/CsvRowConversions.java | 152 +++++++ .../org/apache/beam/sdk/io/csv/package-info.java | 20 + .../org/apache/beam/sdk/io/csv/CsvIOTestData.java | 106 +++++ .../apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java | 265 ++++++++++++ .../org/apache/beam/sdk/io/csv/CsvIOWriteTest.java | 308 ++++++++++++++ .../beam/sdk/io/csv/RowToCsvCSVFormatTest.java | 353 ++++++++++++++++ .../io/csv/RowToCsvPredefinedCSVFormatsTest.java | 284 +++++++++++++ settings.gradle.kts | 1 + 10 files changed, 1981 insertions(+) diff --git a/sdks/java/io/csv/build.gradle b/sdks/java/io/csv/build.gradle new file mode 100644 index 00000000000..ac1a4ba16e6 --- /dev/null +++ b/sdks/java/io/csv/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.csv' +) + +description = "Apache Beam :: SDKs :: Java :: IO :: CSV" +ext.summary = "IO to read and write CSV files." + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.commons_csv + implementation library.java.vendored_guava_26_0_jre + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java new file mode 100644 index 00000000000..084e7de6310 --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.util.Objects.requireNonNull; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.ShardNameTemplate; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.WriteFilesResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.commons.csv.CSVFormat; + +/** + * {@link PTransform}s for reading and writing CSV files. + * + * <h2>Reading CSV files</h2> + * + * <p>Reading from CSV files is not yet implemented. Please see <a + * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>. + * + * <h2>Writing CSV files</h2> + * + * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link + * CsvIO#writeRows} or {@link CsvIO#write}. {@link CsvIO.Write} supports writing {@link Row} or + * custom Java types using an inferred {@link Schema}. Examples below show both scenarios. See the + * Beam Programming Guide on <a + * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring + * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom + * Java type. + * + * <p>{@link CsvIO.Write} only supports writing the parts of {@link Schema} aware types that do not + * contain any nested {@link FieldType}s such a {@link + * org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or repeated {@link + * org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link + * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s. + * + * <h3>Example usage:</h3> + * + * <p>Suppose we have the following <code>Transaction</code> class annotated with + * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}: + * + * <pre>{@code @DefaultSchema(JavaBeanSchema.class) + * public class Transaction { + * public Transaction() { … } + * public Long getTransactionId(); + * public void setTransactionId(Long transactionId) { … } + * public String getBank() { … } + * public void setBank(String bank) { … } + * public double getPurchaseAmount() { … } + * public void setPurchaseAmount(double purchaseAmount) { … } + * } + * }</pre> + * + * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files + * automatically creating the header based on its inferred {@link Schema}. + * + * <pre>{@code + * PCollection<Transaction> transactions = ... + * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT)); + * }</pre> + * + * <p>The resulting CSV files will look like the following where the header is repeated for every + * file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b> of the + * field names. + * + * <pre>{@code + * bank,purchaseAmount,transactionId + * A,10.23,12345 + * B,54.65,54321 + * C,11.76,98765 + * }</pre> + * + * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link + * CSVFormat#withHeader}. Note, however, the following constraints: + * + * <ol> + * <li>Each header column must match a field name in the {@link Schema}; matching is case + * sensitive. + * <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s; + * see {@link #VALID_FIELD_TYPE_SET}. + * <li>{@link CSVFormat} only allows repeated header columns when {@link + * CSVFormat#withAllowDuplicateHeaderNames} + * </ol> + * + * <p>The following example shows the use of {@link CSVFormat#withHeader} to control the order and + * subset of <code>Transaction</code> fields. + * + * <pre>{@code + * PCollection<Transaction> transactions ... + * transactions.apply( + * CsvIO + * .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount")) + * ); + * }</pre> + * + * <p>The resulting CSV files will look like the following where the header is repeated for every + * file, but will only include the subset of fields in their listed order. + * + * <pre>{@code + * transactionId,purchaseAmount + * 12345,10.23 + * 54321,54.65 + * 98765,11.76 + * }</pre> + * + * <p>In addition to header customization, {@link CsvIO.Write} supports {@link + * CSVFormat#withHeaderComments} as shown below. Note that {@link CSVFormat#withCommentMarker} is + * required when specifying header comments. + * + * <pre>{@code + * PCollection<Transaction> transactions = ... + * transactions + * .apply( + * CsvIO.<Transaction>write("path/to/folder/prefix", + * CSVFormat.DEFAULT + * .withCommentMarker('#') + * .withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe") + * ); + * }</pre> + * + * <p>The resulting CSV files will look like the following where the header and header comments are + * repeated for every shard file. + * + * <pre>{@code + * # Bank Report + * # 1970-01-01 + * # Operator: John Doe + * bank,purchaseAmount,transactionId + * A,10.23,12345 + * B,54.65,54321 + * C,11.76,98765 + * }</pre> + * + * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above, + * except we use {@link CsvIO#writeRows} as shown below for the same {@code Transaction} class. We + * derive {@code Transaction}'s {@link Schema} using a {@link + * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that + * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead + * encouraged to take advantage of {@link + * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction}. + * + * <pre>{@code + * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider(); + * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class)); + * PCollection<Row> transactions = pipeline.apply(Create.of( + * Row + * .withSchema(schema) + * .withFieldValue("bank", "A") + * .withFieldValue("purchaseAmount", 10.23) + * .withFieldValue("transactionId", "12345") + * .build(), + * Row + * .withSchema(schema) + * .withFieldValue("bank", "B") + * .withFieldValue("purchaseAmount", 54.65) + * .withFieldValue("transactionId", "54321") + * .build(), + * Row + * .withSchema(schema) + * .withFieldValue("bank", "C") + * .withFieldValue("purchaseAmount", 11.76) + * .withFieldValue("transactionId", "98765") + * .build() + * ); + * + * transactions.apply( + * CsvIO + * .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT) + * ); + * }</pre> + * + * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV + * file content. + * + * <pre>{@code + * bank,purchaseAmount,transactionId + * A,10.23,12345 + * B,54.65,54321 + * C,11.76,98765 + * }</pre> + * + * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an + * {@link IllegalArgumentException}. + * + * <ul> + * <li>{@link CSVFormat#withAllowMissingColumnNames} + * <li>{@link CSVFormat#withAutoFlush} + * <li>{@link CSVFormat#withIgnoreHeaderCase} + * <li>{@link CSVFormat#withIgnoreSurroundingSpaces} + * </ul> + */ +public class CsvIO { + /** + * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV records to the fields. + * + * <ul> + * <li>{@link FieldType#BYTE} + * <li>{@link FieldType#BOOLEAN} + * <li>{@link FieldType#DATETIME} + * <li>{@link FieldType#DECIMAL} + * <li>{@link FieldType#DOUBLE} + * <li>{@link FieldType#INT16} + * <li>{@link FieldType#INT32} + * <li>{@link FieldType#INT64} + * <li>{@link FieldType#FLOAT} + * <li>{@link FieldType#STRING} + * </ul> + */ + public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET = + ImmutableSet.of( + FieldType.BYTE, + FieldType.BOOLEAN, + FieldType.DATETIME, + FieldType.DECIMAL, + FieldType.DOUBLE, + FieldType.INT16, + FieldType.INT32, + FieldType.INT64, + FieldType.FLOAT, + FieldType.STRING); + + static final String DEFAULT_FILENAME_SUFFIX = ".csv"; + + /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} format. */ + public static <T> Write<T> write(String to, CSVFormat csvFormat) { + return new AutoValue_CsvIO_Write.Builder<T>() + .setTextIOWrite(createDefaultTextIOWrite(to)) + .setCSVFormat(csvFormat) + .build(); + } + + /** Instantiates a {@link Write} for writing {@link Row}s in {@link CSVFormat} format. */ + public static Write<Row> writeRows(String to, CSVFormat csvFormat) { + return new AutoValue_CsvIO_Write.Builder<Row>() + .setTextIOWrite(createDefaultTextIOWrite(to)) + .setCSVFormat(csvFormat) + .build(); + } + + /** {@link PTransform} for writing CSV files. */ + @AutoValue + public abstract static class Write<T> + extends PTransform<PCollection<T>, WriteFilesResult<String>> { + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write<T> withCompression(Compression compression) { + return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build(); + } + + /** Specifies all data written without spilling, simplifying the pipeline. */ + public Write<T> withNoSpilling() { + return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * TextIO.Write#withNumShards}. + */ + public Write<T> withNumShards(Integer numShards) { + return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build(); + } + + /** + * Forces a single file as output and empty shard name template. See {@link + * TextIO.Write#withoutSharding}. + */ + public Write<T> withoutSharding() { + return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build(); + } + + /** + * Uses the given {@link ShardNameTemplate} for naming output files. See {@link + * TextIO.Write#withShardNameTemplate}. + */ + public Write<T> withShardTemplate(String shardTemplate) { + return toBuilder() + .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate)) + .build(); + } + + /** Configures the filename suffix for written files. See {@link TextIO.Write#withSuffix}. */ + public Write<T> withSuffix(String suffix) { + return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build(); + } + + /** + * Set the base directory used to generate temporary files. See {@link + * TextIO.Write#withTempDirectory}. + */ + public Write<T> withTempDirectory(ResourceId tempDirectory) { + return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build(); + } + + /** + * Preserves windowing of input elements and writes them to files based on the element's window. + * See {@link TextIO.Write#withWindowedWrites}. + */ + public Write<T> withWindowedWrites() { + return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build(); + } + + /** + * Returns a transform for writing to text files like this one but that has the given {@link + * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during + * output. See {@link TextIO.Write#withWritableByteChannelFactory}. + */ + public Write<T> withWritableByteChannelFactory( + FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) { + return toBuilder() + .setTextIOWrite( + getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory)) + .build(); + } + + /** The underlying {@link FileIO.Write} that writes converted input to CSV formatted output. */ + abstract TextIO.Write getTextIOWrite(); + + /** The {@link CSVFormat} to inform headers, header comments, and format CSV row content. */ + abstract CSVFormat getCSVFormat(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + /** + * The underlying {@link FileIO.Write} that writes converted input to CSV formatted output. + */ + abstract Builder<T> setTextIOWrite(TextIO.Write value); + + /** The {@link CSVFormat} to convert input. Defaults to {@link CSVFormat#DEFAULT}. */ + abstract Builder<T> setCSVFormat(CSVFormat value); + + abstract CSVFormat getCSVFormat(); + + abstract Write<T> autoBuild(); + + final Write<T> build() { + + if (getCSVFormat().getHeaderComments() != null) { + checkArgument( + getCSVFormat().isCommentMarkerSet(), + "CSVFormat withCommentMarker required when withHeaderComments"); + } + + return autoBuild(); + } + } + + @Override + public WriteFilesResult<String> expand(PCollection<T> input) { + if (!input.hasSchema()) { + throw new IllegalArgumentException( + String.format( + "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types", + Write.class.getName())); + } + + Schema schema = input.getSchema(); + + RowCoder rowCoder = RowCoder.of(schema); + + PCollection<Row> rows = + input + .apply("To Rows", MapElements.into(rows()).via(input.getToRowFunction())) + .setCoder(rowCoder); + + CSVFormat csvFormat = buildHeaderFromSchemaIfNeeded(getCSVFormat(), schema); + + TextIO.Write write = getTextIOWrite(); + + write = writeWithCSVFormatHeaderAndComments(csvFormat, write); + + SerializableFunction<Row, String> toCsvFn = + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(csvFormat) + .setSchema(input.getSchema()) + .build(); + + PCollection<String> csv = rows.apply("To CSV", MapElements.into(strings()).via(toCsvFn)); + + return csv.apply("Write CSV", write.withOutputFilenames()); + } + + private static CSVFormat buildHeaderFromSchemaIfNeeded(CSVFormat csvFormat, Schema schema) { + if (csvFormat.getHeader() == null) { + csvFormat = csvFormat.withHeader(schema.sorted().getFieldNames().toArray(new String[0])); + } + + return csvFormat; + } + + private static TextIO.Write writeWithCSVFormatHeaderAndComments( + CSVFormat csvFormat, TextIO.Write write) { + + if (csvFormat.getSkipHeaderRecord()) { + return write; + } + + String[] header = requireNonNull(csvFormat.getHeader()); + List<String> result = new ArrayList<>(); + if (csvFormat.getHeaderComments() != null) { + for (String comment : csvFormat.getHeaderComments()) { + result.add(csvFormat.getCommentMarker() + " " + comment); + } + } + + CSVFormat withoutHeaderComments = csvFormat.withHeaderComments(); + + result.add( + withoutHeaderComments + // The withSkipHeaderRecord parameter prevents CSVFormat from outputting two copies of + // the header. + .withSkipHeaderRecord() + .format((Object[]) header)); + + return write.withHeader(String.join("\n", result)); + } + } + + private static TextIO.Write createDefaultTextIOWrite(String to) { + return TextIO.write().to(to).withSuffix(DEFAULT_FILENAME_SUFFIX); + } +} diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java new file mode 100644 index 00000000000..6faccfe021f --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static org.apache.beam.sdk.io.csv.CsvIO.VALID_FIELD_TYPE_SET; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** Contains classes and methods to help with converting between {@link Row} and CSV strings. */ +class CsvRowConversions { + + /** Converts between {@link Row} and CSV string using a {@link CSVFormat}. */ + @AutoValue + abstract static class RowToCsv implements SerializableFunction<Row, String> { + + static Builder builder() { + return new AutoValue_CsvRowConversions_RowToCsv.Builder(); + } + + /** The expected {@link Schema} of the {@link Row} input. */ + abstract Schema getSchema(); + + /** The {@link CSVFormat} of the converted {@link Row} input. */ + abstract CSVFormat getCSVFormat(); + + /** Converts a {@link Row} to a CSV string formatted using {@link #getCSVFormat}. */ + @Override + public String apply(Row input) { + Row safeInput = checkNotNull(input); + String[] header = getHeader(); + Object[] values = new Object[header.length]; + for (int i = 0; i < header.length; i++) { + values[i] = safeInput.getValue(header[i]); + } + return getCSVFormat().format(values); + } + + @NonNull + String[] getHeader() { + return checkNotNull(getCSVFormat().getHeader()); + } + + @AutoValue.Builder + abstract static class Builder { + + /** The expected {@link Schema} of the {@link Row} input. */ + abstract Builder setSchema(Schema schema); + + abstract Schema getSchema(); + + /** The {@link CSVFormat} of the converted {@link Row} input. */ + abstract Builder setCSVFormat(CSVFormat format); + + abstract CSVFormat getCSVFormat(); + + abstract RowToCsv autoBuild(); + + final RowToCsv build() { + checkArgument(getSchema().getFieldCount() > 0, "Schema has no fields"); + setCSVFormat( + getCSVFormat() + // CSVFormat was designed to write to a single file. + // Therefore, we need to apply withSkipHeaderRecord to prevent CSVFormat to apply + // its header to each converted Row in the context of RowToCsv. + .withSkipHeaderRecord() + // Delegate to TextIO.Write.withDelimiter instead. + .withRecordSeparator(' ') + .withHeaderComments()); + validateCSVFormat(getCSVFormat()); + validateHeaderAgainstSchema(getCSVFormat().getHeader(), getSchema()); + + return autoBuild(); + } + } + } + + private static void validateCSVFormat(CSVFormat csvFormat) { + String[] header = checkNotNull(csvFormat.getHeader(), "CSVFormat withHeader is required"); + + checkArgument(header.length > 0, "CSVFormat withHeader requires at least one column"); + + checkArgument(!csvFormat.getAutoFlush(), "withAutoFlush is an illegal CSVFormat setting"); + + checkArgument( + !csvFormat.getIgnoreHeaderCase(), "withIgnoreHeaderCase is an illegal CSVFormat setting"); + + checkArgument( + !csvFormat.getAllowMissingColumnNames(), + "withAllowMissingColumnNames is an illegal CSVFormat setting"); + + checkArgument( + !csvFormat.getIgnoreSurroundingSpaces(), + "withIgnoreSurroundingSpaces is an illegal CSVFormat setting"); + } + + private static void validateHeaderAgainstSchema(String[] csvHeader, Schema schema) { + Set<String> distinctColumns = new HashSet<>(Arrays.asList(csvHeader)); + List<String> mismatchColumns = new ArrayList<>(); + List<String> invalidTypes = new ArrayList<>(); + Set<TypeName> validTypeNames = + VALID_FIELD_TYPE_SET.stream().map(FieldType::getTypeName).collect(Collectors.toSet()); + for (String column : distinctColumns) { + if (!schema.hasField(column)) { + mismatchColumns.add(column); + continue; + } + TypeName typeName = schema.getField(column).getType().getTypeName(); + if (!validTypeNames.contains(typeName)) { + invalidTypes.add(column); + } + } + + checkArgument( + mismatchColumns.isEmpty(), + "columns in CSVFormat header do not exist in Schema: %s", + String.join(",", mismatchColumns)); + checkArgument( + invalidTypes.isEmpty(), + "columns in header match fields in Schema with invalid types: %s. See CsvIO#VALID_FIELD_TYPE_SET for a list of valid field types.", + String.join(",", invalidTypes)); + } +} diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java new file mode 100644 index 00000000000..4cab7a9abf4 --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing CSV files. */ +package org.apache.beam.sdk.io.csv; diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java new file mode 100644 index 00000000000..e1048b195d2 --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestData.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.util.Objects.requireNonNull; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypes; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypesToRowFn; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.nullableAllPrimitiveDataTypes; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.nullableAllPrimitiveDataTypesToRowFn; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.timeContaining; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.timeContainingToRowFn; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; + +/** Shared data for use in {@link CsvIO} tests and related classes. */ +class CsvIOTestData { + static final CsvIOTestData DATA = new CsvIOTestData(); + + private CsvIOTestData() {} + + final Row allPrimitiveDataTypesRow = + requireNonNull( + allPrimitiveDataTypesToRowFn() + .apply( + allPrimitiveDataTypes( + false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 1, 1L, "a"))); + + final Row allPrimitiveDataTypesRowWithPadding = + requireNonNull( + allPrimitiveDataTypesToRowFn() + .apply( + allPrimitiveDataTypes( + false, + (byte) 1, + BigDecimal.TEN, + 1.0, + 1.0f, + (short) 1.0, + 1, + 1L, + " a "))); + + final List<Row> allPrimitiveDataTypeRows = + Stream.of( + allPrimitiveDataTypes( + false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 1, 1L, "a"), + allPrimitiveDataTypes( + false, + (byte) 2, + BigDecimal.TEN.add(BigDecimal.TEN), + 2.0, + 2.0f, + (short) 2.0, + 2, + 2L, + "b"), + allPrimitiveDataTypes( + false, + (byte) 3, + BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN), + 3.0, + 3.0f, + (short) 3.0, + 3, + 3L, + "c")) + .map(allPrimitiveDataTypesToRowFn()::apply) + .collect(Collectors.toList()); + + final Row nullableTypesRowAllNull = + requireNonNull( + nullableAllPrimitiveDataTypesToRowFn() + .apply(nullableAllPrimitiveDataTypes(null, null, null, null, null, null))); + + final Row nullableTypesRowSomeNull = + requireNonNull( + nullableAllPrimitiveDataTypesToRowFn() + .apply(nullableAllPrimitiveDataTypes(true, null, null, 1, null, "a"))); + + final Row timeContainingRow = + timeContainingToRowFn() + .apply( + timeContaining( + Instant.ofEpochMilli(1L), Collections.singletonList(Instant.ofEpochMilli(1L)))); +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java new file mode 100644 index 00000000000..c7ccf079f33 --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOTestJavaBeans.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.util.Objects.requireNonNull; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Instant; + +// TODO(https://github.com/apache/beam/issues/24980): replace with common schema-aware classes; see +// task description. +/** Classes and data to drive CsvIO tests. */ +class CsvIOTestJavaBeans { + + private static final AutoValueSchema DEFAULT_SCHEMA_PROVIDER = new AutoValueSchema(); + + /** Convenience method for {@link AllPrimitiveDataTypes} instantiation. */ + static AllPrimitiveDataTypes allPrimitiveDataTypes( + Boolean aBoolean, + Byte aByte, + BigDecimal aDecimal, + Double aDouble, + Float aFloat, + Short aShort, + Integer anInteger, + Long aLong, + String aString) { + return new AutoValue_CsvIOTestJavaBeans_AllPrimitiveDataTypes.Builder() + .setABoolean(aBoolean) + .setAByte(aByte) + .setADecimal(aDecimal) + .setADouble(aDouble) + .setAFloat(aFloat) + .setAShort(aShort) + .setAnInteger(anInteger) + .setALong(aLong) + .setAString(aString) + .build(); + } + + /** Convenience method for {@link NullableAllPrimitiveDataTypes} instantiation. */ + static NullableAllPrimitiveDataTypes nullableAllPrimitiveDataTypes( + @Nullable Boolean aBoolean, + @Nullable Double aDouble, + @Nullable Float aFloat, + @Nullable Integer anInteger, + @Nullable Long aLong, + @Nullable String aString) { + return new AutoValue_CsvIOTestJavaBeans_NullableAllPrimitiveDataTypes.Builder() + .setABoolean(aBoolean) + .setADouble(aDouble) + .setAFloat(aFloat) + .setAnInteger(anInteger) + .setALong(aLong) + .setAString(aString) + .build(); + } + + /** Convenience method for {@link TimeContaining} instantiation. */ + static TimeContaining timeContaining(Instant instant, List<Instant> instantList) { + return new AutoValue_CsvIOTestJavaBeans_TimeContaining.Builder() + .setInstant(instant) + .setInstantList(instantList) + .build(); + } + + private static final TypeDescriptor<AllPrimitiveDataTypes> + ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(AllPrimitiveDataTypes.class); + + /** The schema for {@link AllPrimitiveDataTypes}. */ + static final Schema ALL_PRIMITIVE_DATA_TYPES_SCHEMA = + requireNonNull(DEFAULT_SCHEMA_PROVIDER.schemaFor(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR)); + + /** + * Returns a {@link SerializableFunction} to convert from a {@link AllPrimitiveDataTypes} to a + * {@link Row}. + */ + static SerializableFunction<AllPrimitiveDataTypes, Row> allPrimitiveDataTypesToRowFn() { + return DEFAULT_SCHEMA_PROVIDER.toRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR); + } + + private static final TypeDescriptor<NullableAllPrimitiveDataTypes> + NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = + TypeDescriptor.of(NullableAllPrimitiveDataTypes.class); + + /** The schema for {@link NullableAllPrimitiveDataTypes}. */ + static final Schema NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA = + requireNonNull( + DEFAULT_SCHEMA_PROVIDER.schemaFor(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR)); + + /** + * Returns a {@link SerializableFunction} to convert from a {@link NullableAllPrimitiveDataTypes} + * to a {@link Row}. + */ + static SerializableFunction<NullableAllPrimitiveDataTypes, Row> + nullableAllPrimitiveDataTypesToRowFn() { + return DEFAULT_SCHEMA_PROVIDER.toRowFunction(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR); + } + + private static final TypeDescriptor<TimeContaining> TIME_CONTAINING_TYPE_DESCRIPTOR = + TypeDescriptor.of(TimeContaining.class); + + /** The schema for {@link TimeContaining}. */ + static final Schema TIME_CONTAINING_SCHEMA = + requireNonNull(DEFAULT_SCHEMA_PROVIDER.schemaFor(TIME_CONTAINING_TYPE_DESCRIPTOR)); + + /** + * Returns a {@link SerializableFunction} to convert from a {@link TimeContaining} to a {@link + * Row}. + */ + static SerializableFunction<TimeContaining, Row> timeContainingToRowFn() { + return DEFAULT_SCHEMA_PROVIDER.toRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR); + } + + /** + * Contains all primitive Java types i.e. String, Integer, etc and {@link BigDecimal}. The purpose + * of this class is to test schema-aware PTransforms with flat {@link Schema} {@link Row}s. + */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class AllPrimitiveDataTypes implements Serializable { + + abstract Boolean getABoolean(); + + abstract Byte getAByte(); + + abstract BigDecimal getADecimal(); + + abstract Double getADouble(); + + abstract Float getAFloat(); + + abstract Short getAShort(); + + abstract Integer getAnInteger(); + + abstract Long getALong(); + + abstract String getAString(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setABoolean(Boolean value); + + abstract Builder setAByte(Byte value); + + abstract Builder setADecimal(BigDecimal value); + + abstract Builder setADouble(Double value); + + abstract Builder setAFloat(Float value); + + abstract Builder setAShort(Short value); + + abstract Builder setAnInteger(Integer value); + + abstract Builder setALong(Long value); + + abstract Builder setAString(String value); + + abstract AllPrimitiveDataTypes build(); + } + } + + /** + * Contains all nullable primitive Java types i.e. String, Integer, etc and {@link BigDecimal}. + * The purpose of this class is to test schema-aware PTransforms with flat {@link Schema} {@link + * Row}s. + */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class NullableAllPrimitiveDataTypes implements Serializable { + + @Nullable + abstract Boolean getABoolean(); + + @Nullable + abstract Double getADouble(); + + @Nullable + abstract Float getAFloat(); + + @Nullable + abstract Integer getAnInteger(); + + @Nullable + abstract Long getALong(); + + @Nullable + abstract String getAString(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setABoolean(Boolean value); + + abstract Builder setADouble(Double value); + + abstract Builder setAFloat(Float value); + + abstract Builder setAnInteger(Integer value); + + abstract Builder setALong(Long value); + + abstract Builder setAString(String value); + + abstract NullableAllPrimitiveDataTypes build(); + } + } + + /** + * Contains time-related types. The purpose of this class is to test schema-aware PTransforms with + * time-related {@link Schema.FieldType} containing {@link Row}s. + */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class TimeContaining { + + abstract Instant getInstant(); + + abstract List<Instant> getInstantList(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setInstant(Instant value); + + abstract Builder setInstantList(List<Instant> value); + + abstract TimeContaining build(); + } + } +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java new file mode 100644 index 00000000000..dc83007a2a9 --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static java.util.Objects.requireNonNull; +import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.AllPrimitiveDataTypes; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypes; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.commons.csv.CSVFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link CsvIO.Write}. */ +@RunWith(JUnit4.class) +public class CsvIOWriteTest { + @Rule public TestPipeline writePipeline = TestPipeline.create(); + + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @Rule + public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void headersWithCommentsWrittenFirstOnEachShard() { + File folder = + createFolder( + AllPrimitiveDataTypes.class.getSimpleName(), + "headersWithCommentsWrittenFirstOnEachShard"); + + PCollection<Row> input = + writePipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + String expectedHeader = "aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger"; + CSVFormat csvFormat = + CSVFormat.DEFAULT.withHeaderComments("foo", "bar", "baz").withCommentMarker('#'); + + input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(3)); + writePipeline.run().waitUntilFinish(); + + PCollection<FileIO.ReadableFile> files = + readPipeline + .apply(FileIO.match().filepattern(toFilenamePrefix(folder) + "*")) + .apply(FileIO.readMatches()); + PAssert.that(files) + .satisfies( + (Iterable<FileIO.ReadableFile> itr) -> { + Iterable<FileIO.ReadableFile> safeItr = requireNonNull(itr); + for (FileIO.ReadableFile file : safeItr) { + try { + List<String> lines = Splitter.on('\n').splitToList(file.readFullyAsUTF8String()); + assertFalse(lines.isEmpty()); + assertEquals("# foo", lines.get(0)); + assertEquals("# bar", lines.get(1)); + assertEquals("# baz", lines.get(2)); + assertEquals(expectedHeader, lines.get(3)); + + assertTrue( + lines.subList(4, lines.size()).stream().noneMatch(expectedHeader::equals)); + } catch (IOException e) { + fail(e.getMessage()); + } + } + return null; + }); + + readPipeline.run(); + } + + @Test + public void headersWrittenFirstOnEachShard() { + File folder = + createFolder(AllPrimitiveDataTypes.class.getSimpleName(), "headersWrittenFirstOnEachShard"); + + PCollection<Row> input = + writePipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + String expectedHeader = "aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger"; + CSVFormat csvFormat = CSVFormat.DEFAULT; + + input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(3)); + writePipeline.run().waitUntilFinish(); + + PCollection<FileIO.ReadableFile> files = + readPipeline + .apply(FileIO.match().filepattern(toFilenamePrefix(folder) + "*")) + .apply(FileIO.readMatches()); + PAssert.that(files) + .satisfies( + (Iterable<FileIO.ReadableFile> itr) -> { + Iterable<FileIO.ReadableFile> safeItr = requireNonNull(itr); + for (FileIO.ReadableFile file : safeItr) { + try { + List<String> lines = Splitter.on('\n').splitToList(file.readFullyAsUTF8String()); + assertFalse(lines.isEmpty()); + assertEquals(expectedHeader, lines.get(0)); + assertTrue( + lines.subList(1, lines.size()).stream().noneMatch(expectedHeader::equals)); + } catch (IOException e) { + fail(e.getMessage()); + } + } + return null; + }); + + readPipeline.run(); + } + + @Test + public void writesUserDefinedTypes() { + File folder = + createFolder(AllPrimitiveDataTypes.class.getSimpleName(), "writesUserDefinedTypes"); + + PCollection<AllPrimitiveDataTypes> input = + writePipeline.apply( + Create.of( + allPrimitiveDataTypes( + false, (byte) 1, BigDecimal.TEN, 1.0, 1.0f, (short) 1.0, 1, 1L, "a"), + allPrimitiveDataTypes( + false, + (byte) 2, + BigDecimal.TEN.add(BigDecimal.TEN), + 2.0, + 2.0f, + (short) 2.0, + 2, + 2L, + "b"), + allPrimitiveDataTypes( + false, + (byte) 3, + BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN), + 3.0, + 3.0f, + (short) 3.0, + 3, + 3L, + "c"))); + + String expectedHeader = "aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger"; + CSVFormat csvFormat = CSVFormat.DEFAULT; + input.apply( + CsvIO.<AllPrimitiveDataTypes>write(toFilenamePrefix(folder), csvFormat).withNumShards(1)); + + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*"))) + .containsInAnyOrder( + expectedHeader, + "false,1,10,1.0,1.0,1,1,a,1", + "false,2,20,2.0,2.0,2,2,b,2", + "false,3,30,3.0,3.0,3,3,c,3"); + + readPipeline.run(); + } + + @Test + public void nonNullCSVFormatHeaderWritesSelectedSchemaFields() { + File folder = + createFolder( + AllPrimitiveDataTypes.class.getSimpleName(), + "nonNullCSVFormatHeaderWritesSelectedSchemaFields"); + PCollection<Row> input = + writePipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + + String expectedHeader = "aFloat,aString,aDecimal"; + + CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader("aFloat", "aString", "aDecimal"); + input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(1)); + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*"))) + .containsInAnyOrder(expectedHeader, "1.0,a,10", "2.0,b,20", "3.0,c,30"); + + readPipeline.run(); + } + + @Test + public void withSkipHeaderRecordOnlyWritesRows() { + File folder = + createFolder( + AllPrimitiveDataTypes.class.getSimpleName(), + "withSkipHeaderRecordWritesCsvFilesWithoutHeaders"); + + CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(); + + PCollection<Row> input = + writePipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + + input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(1)); + + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*"))) + .containsInAnyOrder( + "false,1,10,1.0,1.0,1,1,a,1", + "false,2,20,2.0,2.0,2,2,b,2", + "false,3,30,3.0,3.0,3,3,c,3"); + + readPipeline.run(); + } + + @Test + public void nullCSVFormatHeaderWritesAllSchemaFields() { + File folder = + createFolder( + AllPrimitiveDataTypes.class.getSimpleName(), + "nullCSVFormatHeaderWritesAllSchemaFields"); + + PCollection<Row> input = + writePipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + String expectedHeader = "aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger"; + CSVFormat csvFormat = CSVFormat.DEFAULT; + + input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(1)); + + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*"))) + .containsInAnyOrder( + expectedHeader, + "false,1,10,1.0,1.0,1,1,a,1", + "false,2,20,2.0,2.0,2,2,b,2", + "false,3,30,3.0,3.0,3,3,c,3"); + + readPipeline.run(); + } + + @Test + public void nonNullHeaderCommentsRequiresHeaderMarker() { + PCollection<Row> input = + errorPipeline.apply( + Create.of(DATA.allPrimitiveDataTypeRows) + .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)); + IllegalArgumentException nullHeaderMarker = + assertThrows( + IllegalArgumentException.class, + () -> + input.apply( + CsvIO.writeRows( + "somewhere", + CSVFormat.DEFAULT.withHeaderComments("some", "header", "comments")))); + + assertEquals( + "CSVFormat withCommentMarker required when withHeaderComments", + nullHeaderMarker.getMessage()); + } + + private static String toFilenamePrefix(File folder) { + checkArgument(folder.isDirectory()); + return folder.getAbsolutePath() + "/out"; + } + + private File createFolder(String... paths) { + try { + return tempFolder.newFolder(paths); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java new file mode 100644 index 00000000000..645710268eb --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.TIME_CONTAINING_SCHEMA; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests {@link org.apache.commons.csv.CSVFormat} settings in the context of {@link + * CsvRowConversions.RowToCsv}. + */ +@RunWith(JUnit4.class) +public class RowToCsvCSVFormatTest { + @Test + public void invalidCSVFormatHeader() { + NullPointerException nullHeaderError = + assertThrows( + NullPointerException.class, + () -> + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(CSVFormat.DEFAULT) + .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .build()); + assertEquals("CSVFormat withHeader is required", nullHeaderError.getMessage()); + + IllegalArgumentException emptyHeaderError = + assertThrows( + IllegalArgumentException.class, + () -> + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(CSVFormat.DEFAULT.withHeader()) + .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .build()); + + assertEquals( + "CSVFormat withHeader requires at least one column", emptyHeaderError.getMessage()); + + IllegalArgumentException mismatchHeaderError = + assertThrows( + IllegalArgumentException.class, + () -> + CsvRowConversions.RowToCsv.builder() + .setCSVFormat( + CSVFormat.DEFAULT.withHeader("aString", "idontexist1", "idontexist2")) + .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .build()); + + assertEquals( + "columns in CSVFormat header do not exist in Schema: idontexist2,idontexist1", + mismatchHeaderError.getMessage()); + } + + @Test + public void invalidSchema() { + IllegalArgumentException emptySchemaError = + assertThrows( + IllegalArgumentException.class, + () -> + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(CSVFormat.DEFAULT.withHeader()) + .setSchema(Schema.of()) + .build()); + + assertEquals("Schema has no fields", emptySchemaError.getMessage()); + + IllegalArgumentException invalidArrayFieldsError = + assertThrows( + IllegalArgumentException.class, + () -> + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant", "instantList")) + .setSchema(TIME_CONTAINING_SCHEMA) + .build()); + + assertEquals( + "columns in header match fields in Schema with invalid types: instantList. See CsvIO#VALID_FIELD_TYPE_SET for a list of valid field types.", + invalidArrayFieldsError.getMessage()); + + // Should not throw Exception when limited to valid fields. + CsvRowConversions.RowToCsv.builder() + .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant")) + .setSchema(TIME_CONTAINING_SCHEMA) + .build(); + } + + @Test + public void withAllowDuplicateHeaderNamesDuplicatesRowFieldOutput() { + assertEquals( + "allowDuplicateHeaderNames=true", + "a,a,a", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withAllowDuplicateHeaderNames(true) + .withHeader("aString", "aString", "aString"))); + } + + @Test + public void withAllowMissingColumnNamesSettingThrowsException() { + IllegalArgumentException exception = + assertThrows( + "allowMissingColumnNames=true", + IllegalArgumentException.class, + () -> + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAllowMissingColumnNames(true))); + + assertEquals( + "withAllowMissingColumnNames is an illegal CSVFormat setting", exception.getMessage()); + } + + @Test + public void withAutoFlushThrowsException() { + IllegalArgumentException exception = + assertThrows( + "autoFlush=true", + IllegalArgumentException.class, + () -> + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAutoFlush(true))); + + assertEquals("withAutoFlush is an illegal CSVFormat setting", exception.getMessage()); + } + + @Test + public void withCommentMarkerDoesNotEffectConversion() { + Schema schema = Schema.of(Field.of("aString", FieldType.STRING)); + Row row = Row.withSchema(schema).attachValues("$abc"); + assertEquals("$abc", rowToCsv(row, csvFormat(schema).withCommentMarker('$'))); + assertEquals("$abc", rowToCsv(row, csvFormat(schema).withCommentMarker(null))); + } + + @Test + public void withDelimiterDrivesCellBorders() { + assertEquals( + "false~1~10~1.0~1.0~1~1~a~1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withDelimiter('~'))); + } + + @Test + public void withEscapeDrivesOutput() { + Schema schema = + Schema.of(Field.of("aString", FieldType.STRING), Field.of("anInt", FieldType.INT32)); + Row row = Row.withSchema(schema).attachValues(",a", 1); + String[] header = new String[] {"anInt", "aString"}; + assertEquals( + "1,#,a", + rowToCsv( + row, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withHeader(header) + .withEscape('#') + .withQuoteMode(QuoteMode.NONE))); + assertEquals( + "1,\",a\"", rowToCsv(row, csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withHeader(header))); + } + + @Test + public void withHeaderDrivesFieldOrderSubsetOutput() { + assertEquals( + "1,false,a", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withHeader("anInteger", "aBoolean", "aString"))); + } + + @Test + public void withHeaderCommentsDoesNotEffectConversion() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withHeaderComments("some", "header", "comments"))); + } + + @Test + public void withIgnoreEmptyLinesDoesNotEffectOutput() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(true))); + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(false))); + } + + @Test + public void withIgnoreHeaderCaseThrowsException() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreHeaderCase(true))); + assertEquals("withIgnoreHeaderCase is an illegal CSVFormat setting", exception.getMessage()); + } + + @Test + public void withIgnoreSurroundingSpacesThrowsException() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreSurroundingSpaces(true))); + assertEquals( + "withIgnoreSurroundingSpaces is an illegal CSVFormat setting", exception.getMessage()); + } + + @Test + public void withNullStringReplacesNullValues() { + assertEquals( + "🦄,🦄,🦄,🦄,🦄,🦄", + rowToCsv( + DATA.nullableTypesRowAllNull, + csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withNullString("🦄"))); + } + + @Test + public void withQuoteDrivesConversion() { + assertEquals( + "@false@,1,10,1.0,1.0,1,1,@a@,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withQuote('@') + .withQuoteMode(QuoteMode.NON_NUMERIC))); + } + + @Test + public void withQuoteModeDrivesCellBoundaries() { + assertEquals( + "\"false\",\"1\",\"10\",\"1.0\",\"1.0\",\"1\",\"1\",\"a\",\"1\"", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.ALL))); + assertEquals( + "\"false\",1,10,1.0,1.0,1,1,\"a\",1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.NON_NUMERIC))); + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withQuoteMode(QuoteMode.MINIMAL))); + assertEquals( + ",,,,,", + rowToCsv( + DATA.nullableTypesRowAllNull, + csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withQuoteMode(QuoteMode.ALL_NON_NULL))); + assertEquals( + "\"true\",,,,\"a\",\"1\"", + rowToCsv( + DATA.nullableTypesRowSomeNull, + csvFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withQuoteMode(QuoteMode.ALL_NON_NULL))); + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .withEscape('#') + .withQuoteMode(QuoteMode.NONE))); + } + + @Test + public void withSystemRecordSeparatorDoesNotEffectOutput() { + assertEquals( + rowToCsv(DATA.allPrimitiveDataTypesRow, csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)), + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withSystemRecordSeparator())); + } + + @Test + public void withTrailingDelimiterAppendsToLineEnd() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1,", + rowToCsv( + DATA.allPrimitiveDataTypesRow, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withTrailingDelimiter(true))); + } + + @Test + public void withTrimRemovesCellPadding() { + assertEquals( + "false,1,10,1.0,1.0,1,1,\" a \",1", + rowToCsv( + DATA.allPrimitiveDataTypesRowWithPadding, csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA))); + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rowToCsv( + DATA.allPrimitiveDataTypesRowWithPadding, + csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withTrim(true))); + } + + private static SerializableFunction<Row, String> rowToCsvFn(Schema schema, CSVFormat csvFormat) { + return CsvRowConversions.RowToCsv.builder().setCSVFormat(csvFormat).setSchema(schema).build(); + } + + private static String rowToCsv(Row row, CSVFormat csvFormat) { + Schema schema = checkNotNull(row.getSchema()); + return rowToCsvFn(schema, csvFormat).apply(row); + } + + private static CSVFormat csvFormat(Schema schema) { + return CSVFormat.DEFAULT.withHeader(schema.sorted().getFieldNames().toArray(new String[0])); + } +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java new file mode 100644 index 00000000000..91dcaec322b --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvPredefinedCSVFormatsTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.TIME_CONTAINING_SCHEMA; +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.io.csv.CsvRowConversions.RowToCsv; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.commons.csv.CSVFormat; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests {@link org.apache.beam.sdk.io.csv.CsvRowConversions.RowToCsv} with {@link + * org.apache.commons.csv.CSVFormat.Predefined} types. + */ +@RunWith(JUnit4.class) +public class RowToCsvPredefinedCSVFormatsTest { + @Test + public void defaultFormat() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + defaultFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + ",,,,,", + defaultFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true,,,,a,1", + defaultFormat(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + defaultFormat(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void excel() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + excel(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + ",,,,,", + excel(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true,,,,a,1", + excel(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + excel(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void informixUnload() { + assertEquals( + "false|1|10|1.0|1.0|1|1|a|1", + informixUnload(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + "|||||", + informixUnload(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true||||a|1", + informixUnload(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + informixUnload(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void informixUnloadCsv() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + informixUnloadCSV(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + ",,,,,", + informixUnloadCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true,,,,a,1", + informixUnloadCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + informixUnloadCSV(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void mySql() { + assertEquals( + "false\t1\t10\t1.0\t1.0\t1\t1\ta\t1", + mySql(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + "\\N\t\\N\t\\N\t\\N\t\\N\t\\N", + mySql(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true\t\\N\t\\N\t\\N\ta\t1", + mySql(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + mySql(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void rfc4180() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + rfc4180(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + ",,,,,", + rfc4180(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true,,,,a,1", + rfc4180(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + rfc4180(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void oracle() { + assertEquals( + "false,1,10,1.0,1.0,1,1,a,1", + oracle(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + "\\N,\\N,\\N,\\N,\\N,\\N", + oracle(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true,\\N,\\N,\\N,a,1", + oracle(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + oracle(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void postgresqlCSV() { + assertEquals( + "\"false\",\"1\",\"10\",\"1.0\",\"1.0\",\"1\",\"1\",\"a\",\"1\"", + postgresqlCSV(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + ",,,,,", + postgresqlCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "\"true\",,,,\"a\",\"1\"", + postgresqlCSV(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "\"1970-01-01T00:00:00.001Z\"", + postgresqlCSV(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void postgresqlText() { + assertEquals( + "\"false\"\t\"1\"\t\"10\"\t\"1.0\"\t\"1.0\"\t\"1\"\t\"1\"\t\"a\"\t\"1\"", + postgresqlText(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + "\\N\t\\N\t\\N\t\\N\t\\N\t\\N", + postgresqlText(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "\"true\"\t\\N\t\\N\t\\N\t\"a\"\t\"1\"", + postgresqlText(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA) + .apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "\"1970-01-01T00:00:00.001Z\"", + postgresqlText(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + @Test + public void tdf() { + assertEquals( + "false\t1\t10\t1.0\t1.0\t1\t1\ta\t1", + tdf(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.allPrimitiveDataTypesRow)); + + assertEquals( + "", tdf(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowAllNull)); + + assertEquals( + "true\t\t\t\ta\t1", + tdf(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA).apply(DATA.nullableTypesRowSomeNull)); + + assertEquals( + "1970-01-01T00:00:00.001Z", + tdf(TIME_CONTAINING_SCHEMA, "instant").apply(DATA.timeContainingRow)); + } + + private static RowToCsv defaultFormat(Schema schema, String... header) { + return rowToCsv(CSVFormat.DEFAULT, schema, header); + } + + private static RowToCsv excel(Schema schema, String... header) { + return rowToCsv(CSVFormat.EXCEL.withAllowMissingColumnNames(false), schema, header); + } + + private static RowToCsv informixUnload(Schema schema, String... header) { + return rowToCsv(CSVFormat.INFORMIX_UNLOAD, schema, header); + } + + private static RowToCsv informixUnloadCSV(Schema schema, String... header) { + return rowToCsv(CSVFormat.INFORMIX_UNLOAD_CSV, schema, header); + } + + private static RowToCsv mySql(Schema schema, String... header) { + return rowToCsv(CSVFormat.MYSQL, schema, header); + } + + private static RowToCsv rfc4180(Schema schema, String... header) { + return rowToCsv(CSVFormat.RFC4180, schema, header); + } + + private static RowToCsv oracle(Schema schema, String... header) { + return rowToCsv(CSVFormat.ORACLE, schema, header); + } + + private static RowToCsv postgresqlCSV(Schema schema, String... header) { + return rowToCsv(CSVFormat.POSTGRESQL_CSV, schema, header); + } + + private static RowToCsv postgresqlText(Schema schema, String... header) { + return rowToCsv(CSVFormat.POSTGRESQL_TEXT, schema, header); + } + + private static RowToCsv tdf(Schema schema, String... header) { + return rowToCsv(CSVFormat.TDF.withIgnoreSurroundingSpaces(false), schema, header); + } + + private static RowToCsv rowToCsv(CSVFormat csvFormat, Schema schema, String... header) { + if (header.length == 0) { + header = schema.sorted().getFieldNames().toArray(new String[0]); + } + return RowToCsv.builder().setCSVFormat(csvFormat.withHeader(header)).setSchema(schema).build(); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 454ba5cb64b..4455981a3fd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -184,6 +184,7 @@ include(":sdks:java:io:expansion-service") include(":sdks:java:io:file-based-io-tests") include(":sdks:java:io:bigquery-io-perf-tests") include(":sdks:java:io:cdap") +include(":sdks:java:io:csv") include(":sdks:java:io:fileschematransform") include(":sdks:java:io:google-cloud-platform") include(":sdks:java:io:google-cloud-platform:expansion-service")