This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.56.0 by this push:
     new d07cc622c80 Simplify intermediate data in Iceberg sink; use manifest 
files (#31090)
d07cc622c80 is described below

commit d07cc622c807d1078437ddbd931feefe06dfb52c
Author: Kenneth Knowles <k...@apache.org>
AuthorDate: Fri Apr 26 06:55:58 2024 -0400

    Simplify intermediate data in Iceberg sink; use manifest files (#31090)
---
 sdks/java/io/iceberg/build.gradle                  |   1 -
 .../beam/sdk/io/iceberg/AppendFilesToTables.java   |   2 +-
 .../beam/sdk/io/iceberg/FileWriteResult.java       | 210 ++++-----------------
 .../apache/beam/sdk/io/iceberg/RecordWriter.java   |  20 +-
 .../sdk/io/iceberg/WriteGroupedRowsToFiles.java    |   3 +-
 .../sdk/io/iceberg/WriteUngroupedRowsToFiles.java  |   5 +-
 .../beam/sdk/io/iceberg/FileWriteResultTest.java   | 166 ----------------
 7 files changed, 61 insertions(+), 346 deletions(-)

diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index e721b98f102..f82284e3b39 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -52,7 +52,6 @@ dependencies {
     implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
     implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
     implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
-    implementation library.java.avro
     implementation library.java.hadoop_common
 
     testImplementation library.java.hadoop_client
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index e4ba6000182..bb42df5a933 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -90,7 +90,7 @@ class AppendFilesToTables
       Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
       AppendFiles update = table.newAppend();
       for (FileWriteResult writtenFile : element.getValue()) {
-        update.appendFile(writtenFile.getDataFile());
+        update.appendManifest(writtenFile.getManifestFile());
       }
       update.commit();
       out.outputWithTimestamp(
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
index c12febc03f4..2459c0befde 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
@@ -17,197 +17,69 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.avro.AvroEncoderUtil;
-import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 @AutoValue
-@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
+@DefaultSchema(AutoValueSchema.class)
 abstract class FileWriteResult {
-  public abstract TableIdentifier getTableIdentifier();
 
-  public abstract PartitionSpec getPartitionSpec();
+  private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
+  private transient @MonotonicNonNull ManifestFile cachedManifestFile;
 
-  public abstract DataFile getDataFile();
+  abstract String getTableIdentifierString();
 
-  public static Builder builder() {
-    return new AutoValue_FileWriteResult.Builder();
-  }
+  @SuppressWarnings("mutable")
+  abstract byte[] getManifestFileBytes();
 
-  @AutoValue.Builder
-  abstract static class Builder {
-    public abstract Builder setTableIdentifier(TableIdentifier tableId);
-
-    public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
-
-    public abstract Builder setDataFile(DataFile dataFiles);
-
-    public abstract FileWriteResult build();
-  }
-
-  public static class FileWriteResultCoder extends 
StructuredCoder<FileWriteResult> {
-    static final int VERSION = 0;
-    private static final FileWriteResultCoder SINGLETON = new 
FileWriteResultCoder();
-
-    private static final Coder<String> tableIdentifierCoder = 
StringUtf8Coder.of();
-    private static final Coder<PartitionSpec> partitionSpecCoder =
-        SerializableCoder.of(PartitionSpec.class);
-    private static final Coder<byte[]> dataFileBytesCoder = 
ByteArrayCoder.of();
-
-    private static Schema getDataFileAvroSchema(FileWriteResult 
fileWriteResult) {
-      Types.StructType partitionType = 
fileWriteResult.getPartitionSpec().partitionType();
-      Types.StructType dataFileStruct = DataFile.getType(partitionType);
-      Map<Types.StructType, String> dataFileNames =
-          ImmutableMap.of(
-              dataFileStruct, "org.apache.iceberg.GenericDataFile",
-              partitionType, "org.apache.iceberg.PartitionData");
-      return AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
-    }
-
-    @Override
-    public void encode(FileWriteResult value, OutputStream outStream)
-        throws CoderException, IOException {
-      // "version" of this coder.
-      // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro, 
etc..),
-      // then update this version and create a fork in decode() below for the 
new decode logic.
-      // This helps keep the pipeline update-compatible
-      outStream.write(VERSION);
-
-      tableIdentifierCoder.encode(value.getTableIdentifier().toString(), 
outStream);
-      partitionSpecCoder.encode(value.getPartitionSpec(), outStream);
-      dataFileBytesCoder.encode(
-          AvroEncoderUtil.encode(value.getDataFile(), 
getDataFileAvroSchema(value)), outStream);
-    }
-
-    @Override
-    public FileWriteResult decode(InputStream inStream) throws CoderException, 
IOException {
-      // Forking logic can be added here depending on the version of this coder
-      assert inStream.read() == 0;
-
-      TableIdentifier tableId = 
TableIdentifier.parse(tableIdentifierCoder.decode(inStream));
-      PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream);
-      DataFile dataFile =
-          checkArgumentNotNull(
-              AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)),
-              "Decoding of dataFile resulted in null");
-      return FileWriteResult.builder()
-          .setTableIdentifier(tableId)
-          .setDataFile(dataFile)
-          .setPartitionSpec(partitionSpec)
-          .build();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public Object structuralValue(FileWriteResult fileWriteResult) {
-      return new FileWriteResultDeepEqualityWrapper(fileWriteResult);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {}
-
-    @Override
-    public TypeDescriptor<FileWriteResult> getEncodedTypeDescriptor() {
-      return TypeDescriptor.of(FileWriteResult.class);
+  @SchemaIgnore
+  public TableIdentifier getTableIdentifier() {
+    if (cachedTableIdentifier == null) {
+      cachedTableIdentifier = 
TableIdentifier.parse(getTableIdentifierString());
     }
+    return cachedTableIdentifier;
+  }
 
-    public static FileWriteResultCoder of() {
-      return SINGLETON;
+  @SchemaIgnore
+  public ManifestFile getManifestFile() {
+    if (cachedManifestFile == null) {
+      try {
+        cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
+      } catch (IOException exc) {
+        throw new RuntimeException("Error decoding manifest file bytes");
+      }
     }
+    return cachedManifestFile;
+  }
 
-    @SuppressWarnings("unused") // used via `DefaultCoder` annotation
-    public static CoderProvider getCoderProvider() {
-      return CoderProviders.forCoder(
-          TypeDescriptor.of(FileWriteResult.class), FileWriteResultCoder.of());
-    }
+  public static Builder builder() {
+    return new AutoValue_FileWriteResult.Builder();
   }
 
-  private static class FileWriteResultDeepEqualityWrapper {
-    private final FileWriteResult fileWriteResult;
+  @AutoValue.Builder
+  abstract static class Builder {
 
-    private FileWriteResultDeepEqualityWrapper(FileWriteResult 
fileWriteResult) {
-      this.fileWriteResult = fileWriteResult;
-    }
+    abstract Builder setTableIdentifierString(String tableIdString);
 
-    @Override
-    public boolean equals(@Nullable Object obj) {
-      if (obj == this) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (!(obj instanceof FileWriteResultDeepEqualityWrapper)) {
-        return false;
-      }
-      FileWriteResultDeepEqualityWrapper other = 
(FileWriteResultDeepEqualityWrapper) obj;
+    abstract Builder setManifestFileBytes(byte[] manifestFileBytes);
 
-      return Objects.equals(
-              fileWriteResult.getTableIdentifier(), 
other.fileWriteResult.getTableIdentifier())
-          && Objects.equals(
-              fileWriteResult.getPartitionSpec(), 
other.fileWriteResult.getPartitionSpec())
-          && dataFilesEqual(fileWriteResult.getDataFile(), 
other.fileWriteResult.getDataFile());
+    @SchemaIgnore
+    public Builder setTableIdentifier(TableIdentifier tableId) {
+      return setTableIdentifierString(tableId.toString());
     }
 
-    private boolean dataFilesEqual(DataFile first, DataFile second) {
-      return Objects.equals(first.pos(), second.pos())
-          && first.specId() == second.specId()
-          && Objects.equals(first.content(), second.content())
-          && Objects.equals(first.path(), second.path())
-          && Objects.equals(first.format(), second.format())
-          && Objects.equals(first.partition(), second.partition())
-          && first.recordCount() == second.recordCount()
-          && first.fileSizeInBytes() == second.fileSizeInBytes()
-          && Objects.equals(first.columnSizes(), second.columnSizes())
-          && Objects.equals(first.valueCounts(), second.valueCounts())
-          && Objects.equals(first.nullValueCounts(), second.nullValueCounts())
-          && Objects.equals(first.nanValueCounts(), second.nanValueCounts())
-          && Objects.equals(first.lowerBounds(), second.lowerBounds())
-          && Objects.equals(first.upperBounds(), second.upperBounds())
-          && Objects.equals(first.keyMetadata(), second.keyMetadata())
-          && Objects.equals(first.splitOffsets(), second.splitOffsets())
-          && Objects.equals(first.equalityFieldIds(), 
second.equalityFieldIds())
-          && Objects.equals(first.sortOrderId(), second.sortOrderId())
-          && Objects.equals(first.dataSequenceNumber(), 
second.dataSequenceNumber())
-          && Objects.equals(first.fileSequenceNumber(), 
second.fileSequenceNumber());
+    @SchemaIgnore
+    public Builder setManifestFile(ManifestFile manifestFile) throws 
IOException {
+      return setManifestFileBytes(ManifestFiles.encode(manifestFile));
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(
-          fileWriteResult.getTableIdentifier(),
-          fileWriteResult.getPartitionSpec(),
-          fileWriteResult.getDataFile());
-    }
+    public abstract FileWriteResult build();
   }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index aa203eb6eb6..859310bdcec 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import org.apache.beam.sdk.values.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.catalog.Catalog;
@@ -37,6 +40,7 @@ class RecordWriter {
   private final DataWriter<Record> icebergDataWriter;
 
   private final Table table;
+  private final String absoluteFilename;
 
   RecordWriter(Catalog catalog, IcebergDestination destination, String 
filename)
       throws IOException {
@@ -46,9 +50,9 @@ class RecordWriter {
 
   RecordWriter(Table table, FileFormat fileFormat, String filename) throws 
IOException {
     this.table = table;
-
-    String absoluteFilename = table.location() + "/" + filename;
+    this.absoluteFilename = table.location() + "/" + filename;
     OutputFile outputFile = table.io().newOutputFile(absoluteFilename);
+
     switch (fileFormat) {
       case AVRO:
         icebergDataWriter =
@@ -92,7 +96,15 @@ class RecordWriter {
     return icebergDataWriter.length();
   }
 
-  public DataFile dataFile() {
-    return icebergDataWriter.toDataFile();
+  public ManifestFile getManifestFile() throws IOException {
+    String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + 
".manifest");
+    OutputFile outputFile = table.io().newOutputFile(manifestFilename);
+    ManifestWriter<DataFile> manifestWriter;
+    try (ManifestWriter<DataFile> openWriter = 
ManifestFiles.write(getTable().spec(), outputFile)) {
+      openWriter.add(icebergDataWriter.toDataFile());
+      manifestWriter = openWriter;
+    }
+
+    return manifestWriter.toManifestFile();
   }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 731a9fefb49..c1126351944 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -95,8 +95,7 @@ class WriteGroupedRowsToFiles
       c.output(
           FileWriteResult.builder()
               .setTableIdentifier(destination.getTableIdentifier())
-              .setDataFile(writer.dataFile())
-              .setPartitionSpec(writer.getTable().spec())
+              .setManifestFile(writer.getManifestFile())
               .build());
     }
   }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index 917aab9e55c..a00f3de4bb4 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -267,7 +267,7 @@ class WriteUngroupedRowsToFiles
         out.get(WRITTEN_FILES_TAG)
             .output(
                 FileWriteResult.builder()
-                    .setDataFile(writer.dataFile())
+                    .setManifestFile(writer.getManifestFile())
                     .setTableIdentifier(destination.getTableIdentifier())
                     .build());
         writer = createAndInsertWriter(destination, window);
@@ -307,9 +307,8 @@ class WriteUngroupedRowsToFiles
                   getWindows().get(destination), "internal error: no windows 
for destination");
           c.output(
               FileWriteResult.builder()
-                  .setDataFile(writer.dataFile())
+                  .setManifestFile(writer.getManifestFile())
                   .setTableIdentifier(destination.getTableIdentifier())
-                  .setPartitionSpec(writer.getTable().spec())
                   .build(),
               window.maxTimestamp(),
               window);
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
deleted file mode 100644
index 64413059315..00000000000
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class FileWriteResultTest implements Serializable {
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-  @Rule public TestDataWarehouse warehouse = new 
TestDataWarehouse(TEMPORARY_FOLDER, "default");
-
-  private static final Coder<FileWriteResult> TEST_CODER =
-      FileWriteResult.FileWriteResultCoder.of();
-
-  private List<FileWriteResult> getTestValues() throws Exception {
-    TableIdentifier tableId =
-        TableIdentifier.of("default", "table" + 
Long.toString(UUID.randomUUID().hashCode(), 16));
-
-    // Create a table so we can have some DataFile objects
-    Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-    List<FileWriteResult> values = Lists.newArrayList();
-
-    // A parquet file
-    RecordWriter writer =
-        new RecordWriter(table, FileFormat.PARQUET, 
TEMPORARY_FOLDER.newFile().toString());
-    writer.write(
-        
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
-            .addValues(42L, "bizzle")
-            .build());
-    writer.close();
-    DataFile dataFile = writer.dataFile();
-    values.add(
-        FileWriteResult.builder()
-            .setDataFile(dataFile)
-            .setPartitionSpec(table.spec())
-            .setTableIdentifier(tableId)
-            .build());
-
-    // An avro file
-    writer = new RecordWriter(table, FileFormat.AVRO, 
TEMPORARY_FOLDER.newFile().toString());
-    writer.write(
-        
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
-            .addValues(42L, "bizzle")
-            .build());
-    writer.close();
-    dataFile = writer.dataFile();
-    values.add(
-        FileWriteResult.builder()
-            .setDataFile(dataFile)
-            .setPartitionSpec(table.spec())
-            .setTableIdentifier(tableId)
-            .build());
-
-    // Parquet file with a different schema
-    TableIdentifier tableId2 =
-        TableIdentifier.of(
-            "default", "othertable" + 
Long.toString(UUID.randomUUID().hashCode(), 16));
-    Schema schema =
-        new Schema(
-            required(1, "id", Types.LongType.get()),
-            optional(2, "data", Types.StringType.get()),
-            optional(
-                3,
-                "extra",
-                Types.StructType.of(
-                    Types.NestedField.required(4, "inner", 
Types.BinaryType.get()))));
-    Table table2 = warehouse.createTable(tableId2, schema);
-
-    // A parquet file in this other table
-    writer = new RecordWriter(table2, FileFormat.PARQUET, 
TEMPORARY_FOLDER.newFile().toString());
-    writer.write(
-        
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(schema))
-            .addValues(
-                42L,
-                "bizzle",
-                Row.withSchema(
-                        org.apache.beam.sdk.schemas.Schema.of(
-                            org.apache.beam.sdk.schemas.Schema.Field.of(
-                                "inner", 
org.apache.beam.sdk.schemas.Schema.FieldType.BYTES)))
-                    .addValues(new byte[] {0xa})
-                    .build())
-            .build());
-    writer.close();
-    DataFile dataFile2 = writer.dataFile();
-    values.add(
-        FileWriteResult.builder()
-            .setDataFile(dataFile2)
-            .setPartitionSpec(table2.spec())
-            .setTableIdentifier(tableId2)
-            .build());
-
-    return values;
-  }
-
-  @Test
-  public void testDecodeEncodeEqual() throws Exception {
-    for (FileWriteResult value : getTestValues()) {
-      CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
-    }
-  }
-
-  @Test
-  public void testDecodeEncodeVersionNumber() throws Exception {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayInputStream in;
-    for (FileWriteResult value : getTestValues()) {
-      TEST_CODER.encode(value, out);
-      in = new ByteArrayInputStream(out.toByteArray());
-
-      assertEquals(FileWriteResult.FileWriteResultCoder.VERSION, in.read());
-    }
-  }
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testEncodedTypeDescriptor() throws Exception {
-    assertThat(
-        TEST_CODER.getEncodedTypeDescriptor(), 
equalTo(TypeDescriptor.of(FileWriteResult.class)));
-  }
-}

Reply via email to