This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.9.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit e1163458fac0c1c7749e169714e17d5e2a4af203
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Aug 6 01:37:38 2020 +0900

    Core: Use array instead of list in BaseCombinedScanTask to fix 
serialization (#1285)
---
 .../org/apache/iceberg/BaseCombinedScanTask.java   |  10 +-
 .../apache/iceberg/TestScanTaskSerialization.java  | 210 +++++++++++++++++++++
 .../iceberg/TestScanTaskSerialization24.java       |  29 +--
 .../apache/iceberg/TestScanTaskSerialization3.java |  29 +--
 4 files changed, 218 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java 
b/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
index 98c6943..253ddfa 100644
--- a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
@@ -23,22 +23,24 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class BaseCombinedScanTask implements CombinedScanTask {
-  private final List<FileScanTask> tasks;
+  private final FileScanTask[] tasks;
 
   public BaseCombinedScanTask(FileScanTask... tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
+    this.tasks = tasks;
   }
 
   public BaseCombinedScanTask(List<FileScanTask> tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
+    Preconditions.checkNotNull(tasks, "tasks cannot be null");
+    this.tasks = tasks.stream().toArray(FileScanTask[]::new);
   }
 
   @Override
   public Collection<FileScanTask> files() {
-    return tasks;
+    return ImmutableList.copyOf(tasks);
   }
 
   @Override
diff --git 
a/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java 
b/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java
new file mode 100644
index 0000000..f443f38
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public abstract class TestScanTaskSerialization extends SparkTestBase {
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "c1", Types.IntegerType.get()),
+      optional(2, "c2", Types.StringType.get()),
+      optional(3, "c3", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private String tableLocation = null;
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    File tableDir = temp.newFolder();
+    this.tableLocation = tableDir.toURI().toString();
+  }
+
+  @Test
+  public void testBaseCombinedScanTaskKryoSerialization() throws Exception {
+    BaseCombinedScanTask scanTask = prepareBaseCombinedScanTaskForSerDeTest();
+
+    File data = temp.newFile();
+    Assert.assertTrue(data.delete());
+    Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();
+
+    try (Output out = new Output(new FileOutputStream(data))) {
+      kryo.writeClassAndObject(out, scanTask);
+    }
+
+    try (Input in = new Input(new FileInputStream(data))) {
+      Object obj = kryo.readClassAndObject(in);
+      Assert.assertTrue("Should be a BaseCombinedScanTask", obj instanceof 
BaseCombinedScanTask);
+      checkBaseCombinedScanTask(scanTask, (BaseCombinedScanTask) obj);
+    }
+  }
+
+  @Test
+  public void testBaseCombinedScanTaskJavaSerialization() throws Exception {
+    BaseCombinedScanTask scanTask = prepareBaseCombinedScanTaskForSerDeTest();
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
+      out.writeObject(scanTask);
+    }
+
+    try (ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(bytes.toByteArray()))) {
+      Object obj = in.readObject();
+      Assert.assertTrue("Should be a BaseCombinedScanTask", obj instanceof 
BaseCombinedScanTask);
+      checkBaseCombinedScanTask(scanTask, (BaseCombinedScanTask) obj);
+    }
+  }
+
+  private BaseCombinedScanTask prepareBaseCombinedScanTaskForSerDeTest() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    return new BaseCombinedScanTask(Lists.newArrayList(tasks));
+  }
+
+  private void checkBaseCombinedScanTask(BaseCombinedScanTask expected, 
BaseCombinedScanTask actual) {
+    List<FileScanTask> expectedTasks = 
getFileScanTasksInFilePathOrder(expected);
+    List<FileScanTask> actualTasks = getFileScanTasksInFilePathOrder(actual);
+
+    Assert.assertEquals("The number of file scan tasks should match",
+        expectedTasks.size(), actualTasks.size());
+
+    for (int i = 0; i < expectedTasks.size(); i++) {
+      FileScanTask expectedTask = expectedTasks.get(i);
+      FileScanTask actualTask = actualTasks.get(i);
+      checkFileScanTask(expectedTask, actualTask);
+    }
+  }
+
+  private List<FileScanTask> 
getFileScanTasksInFilePathOrder(BaseCombinedScanTask task) {
+    return task.files().stream()
+        // use file path + start position to differentiate the tasks
+        .sorted(Comparator.comparing(o -> o.file().path().toString() + "##" + 
o.start()))
+        .collect(Collectors.toList());
+  }
+
+  private void checkFileScanTask(FileScanTask expected, FileScanTask actual) {
+    checkDataFile(expected.file(), actual.file());
+
+    // PartitionSpec implements its own equals method
+    Assert.assertEquals("PartitionSpec doesn't match", expected.spec(), 
actual.spec());
+
+    Assert.assertEquals("starting position doesn't match", expected.start(), 
actual.start());
+
+    Assert.assertEquals("the number of bytes to scan doesn't match", 
expected.start(), actual.start());
+
+    // simplify comparison on residual expression via comparing toString
+    Assert.assertEquals("Residual expression doesn't match",
+        expected.residual().toString(), actual.residual().toString());
+  }
+
+  // TODO: this is a copy of TestDataFileSerialization.checkDataFile, 
deduplicate
+  private void checkDataFile(DataFile expected, DataFile actual) {
+    Assert.assertEquals("Should match the serialized record path",
+        expected.path(), actual.path());
+    Assert.assertEquals("Should match the serialized record format",
+        expected.format(), actual.format());
+    Assert.assertEquals("Should match the serialized record partition",
+        expected.partition().get(0, Object.class), actual.partition().get(0, 
Object.class));
+    Assert.assertEquals("Should match the serialized record count",
+        expected.recordCount(), actual.recordCount());
+    Assert.assertEquals("Should match the serialized record size",
+        expected.fileSizeInBytes(), actual.fileSizeInBytes());
+    Assert.assertEquals("Should match the serialized record value counts",
+        expected.valueCounts(), actual.valueCounts());
+    Assert.assertEquals("Should match the serialized record null value counts",
+        expected.nullValueCounts(), actual.nullValueCounts());
+    Assert.assertEquals("Should match the serialized record lower bounds",
+        expected.lowerBounds(), actual.lowerBounds());
+    Assert.assertEquals("Should match the serialized record upper bounds",
+        expected.upperBounds(), actual.upperBounds());
+    Assert.assertEquals("Should match the serialized record key metadata",
+        expected.keyMetadata(), actual.keyMetadata());
+    Assert.assertEquals("Should match the serialized record offsets",
+        expected.splitOffsets(), actual.splitOffsets());
+    Assert.assertEquals("Should match the serialized record offsets",
+        expected.keyMetadata(), actual.keyMetadata());
+  }
+
+  private void writeRecords(List<ThreeColumnRecord> records) {
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+  }
+
+  private void writeDF(Dataset<Row> df) {
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java 
b/spark2/src/test/java/org/apache/iceberg/TestScanTaskSerialization24.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
copy to spark2/src/test/java/org/apache/iceberg/TestScanTaskSerialization24.java
index 98c6943..28184b7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
+++ b/spark2/src/test/java/org/apache/iceberg/TestScanTaskSerialization24.java
@@ -19,32 +19,5 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
-import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
-public class BaseCombinedScanTask implements CombinedScanTask {
-  private final List<FileScanTask> tasks;
-
-  public BaseCombinedScanTask(FileScanTask... tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
-  }
-
-  public BaseCombinedScanTask(List<FileScanTask> tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
-  }
-
-  @Override
-  public Collection<FileScanTask> files() {
-    return tasks;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("tasks", Joiner.on(", ").join(tasks))
-        .toString();
-  }
+public class TestScanTaskSerialization24 extends TestScanTaskSerialization {
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java 
b/spark3/src/test/java/org/apache/iceberg/TestScanTaskSerialization3.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
copy to spark3/src/test/java/org/apache/iceberg/TestScanTaskSerialization3.java
index 98c6943..035aa49 100644
--- a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java
+++ b/spark3/src/test/java/org/apache/iceberg/TestScanTaskSerialization3.java
@@ -19,32 +19,5 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
-import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
-public class BaseCombinedScanTask implements CombinedScanTask {
-  private final List<FileScanTask> tasks;
-
-  public BaseCombinedScanTask(FileScanTask... tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
-  }
-
-  public BaseCombinedScanTask(List<FileScanTask> tasks) {
-    this.tasks = ImmutableList.copyOf(tasks);
-  }
-
-  @Override
-  public Collection<FileScanTask> files() {
-    return tasks;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("tasks", Joiner.on(", ").join(tasks))
-        .toString();
-  }
+public class TestScanTaskSerialization3 extends TestScanTaskSerialization {
 }

Reply via email to