This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bc7bd0e  Flink: Add Avro value reader, writer implementations (#1153)
bc7bd0e is described below

commit bc7bd0e30839740d0d9d7430cac984c10659ca38
Author: openinx <[email protected]>
AuthorDate: Sat Jul 4 00:13:54 2020 +0800

    Flink: Add Avro value reader, writer implementations (#1153)
---
 .../org/apache/iceberg/data/avro/DataReader.java   |  11 +-
 .../org/apache/iceberg/data/avro/DataWriter.java   |  11 +-
 .../apache/iceberg/flink/data/FlinkAvroReader.java |  72 +++
 .../apache/iceberg/flink/data/FlinkAvroWriter.java |  49 ++
 .../org/apache/iceberg/flink/data/RandomData.java  |  30 ++
 .../flink/data/TestFlinkAvroReaderWriter.java      |  73 +++
 .../flink/data/TestFlinkParquetReaderWriter.java   |  31 +-
 .../iceberg/flink/data/TestRowProjection.java      | 570 +++++++++++++++++++++
 8 files changed, 809 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index 8c7d0f3..c13bd25 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -59,7 +59,7 @@ public class DataReader<T> implements DatumReader<T> {
   private Schema fileSchema = null;
 
   @SuppressWarnings("unchecked")
-  private DataReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema, Map<Integer, ?> idToConstant) {
+  protected DataReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema, Map<Integer, ?> idToConstant) {
     this.readSchema = readSchema;
     this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor
         .visit(expectedSchema, readSchema, new ReadBuilder(idToConstant));
@@ -102,7 +102,12 @@ public class DataReader<T> implements DatumReader<T> {
     }
   }
 
-  private static class ReadBuilder extends 
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+  protected ValueReader<?> createStructReader(Types.StructType struct,
+                                              List<ValueReader<?>> fields, 
Map<Integer, ?> idToConstant) {
+    return GenericReaders.struct(struct, fields, idToConstant);
+  }
+
+  private class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;
 
     private ReadBuilder(Map<Integer, ?> idToConstant) {
@@ -112,7 +117,7 @@ public class DataReader<T> implements DatumReader<T> {
     @Override
     public ValueReader<?> record(Types.StructType struct, Schema record,
                                  List<String> names, List<ValueReader<?>> 
fields) {
-      return GenericReaders.struct(struct, fields, idToConstant);
+      return createStructReader(struct, fields, idToConstant);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
index fc47dd0..aa6cc6f 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
@@ -40,7 +40,7 @@ public class DataWriter<T> implements DatumWriter<T> {
     return new DataWriter<>(schema);
   }
 
-  private DataWriter(Schema schema) {
+  protected DataWriter(Schema schema) {
     setSchema(schema);
   }
 
@@ -55,13 +55,14 @@ public class DataWriter<T> implements DatumWriter<T> {
     writer.write(datum, out);
   }
 
-  private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
-    private WriteBuilder() {
-    }
+  protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
+    return GenericWriters.struct(fields);
+  }
 
+  private class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
     @Override
     public ValueWriter<?> record(Schema record, List<String> names, 
List<ValueWriter<?>> fields) {
-      return GenericWriters.struct(fields);
+      return createStructWriter(fields);
     }
 
     @Override
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
new file mode 100644
index 0000000..9f1c97f
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+
+public class FlinkAvroReader extends DataReader<Row> {
+
+  public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema) {
+    super(expectedSchema, readSchema, ImmutableMap.of());
+  }
+
+  @Override
+  protected ValueReader<?> createStructReader(Types.StructType struct,
+                                              List<ValueReader<?>> fields,
+                                              Map<Integer, ?> idToConstant) {
+    return new RowReader(fields, struct, idToConstant);
+  }
+
+  private static class RowReader extends ValueReaders.StructReader<Row> {
+    private final Types.StructType structType;
+
+    private RowReader(List<ValueReader<?>> readers, Types.StructType struct, 
Map<Integer, ?> idToConstant) {
+      super(readers, struct, idToConstant);
+      this.structType = struct;
+    }
+
+    @Override
+    protected Row reuseOrCreate(Object reuse) {
+      if (reuse instanceof Row) {
+        return (Row) reuse;
+      } else {
+        return new Row(structType.fields().size());
+      }
+    }
+
+    @Override
+    protected Object get(Row row, int pos) {
+      return row.getField(pos);
+    }
+
+    @Override
+    protected void set(Row row, int pos, Object value) {
+      row.setField(pos, value);
+    }
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
new file mode 100644
index 0000000..fdc0838
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.avro.ValueWriter;
+import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.data.avro.DataWriter;
+
+public class FlinkAvroWriter extends DataWriter<Row> {
+  public FlinkAvroWriter(Schema schema) {
+    super(schema);
+  }
+
+  @Override
+  protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
+    return new RowWriter(fields);
+  }
+
+  private static class RowWriter extends ValueWriters.StructWriter<Row> {
+    private RowWriter(List<ValueWriter<?>> writers) {
+      super(writers);
+    }
+
+    @Override
+    protected Object get(Row struct, int pos) {
+      return struct.getField(pos);
+    }
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java 
b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
index cdf667b..4cd1efc 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
@@ -33,10 +33,40 @@ import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.RandomUtil;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 class RandomData {
   private RandomData() {
   }
 
+  static final Schema COMPLEX_SCHEMA = new Schema(
+      required(1, "roots", Types.LongType.get()),
+      optional(3, "lime", Types.ListType.ofRequired(4, 
Types.DoubleType.get())),
+      required(5, "strict", Types.StructType.of(
+          required(9, "tangerine", Types.StringType.get()),
+          optional(6, "hopeful", Types.StructType.of(
+              required(7, "steel", Types.FloatType.get()),
+              required(8, "lantern", Types.DateType.get())
+          )),
+          optional(10, "vehement", Types.LongType.get())
+      )),
+      optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13,
+          Types.StringType.get(), Types.TimestampType.withZone())),
+      required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of(
+          optional(16, "beet", Types.DoubleType.get()),
+          required(17, "stamp", Types.FloatType.get()),
+          optional(18, "wheeze", Types.StringType.get())
+      ))),
+      optional(19, "renovate", Types.MapType.ofRequired(20, 21,
+          Types.StringType.get(), Types.StructType.of(
+              optional(22, "jumpy", Types.DoubleType.get()),
+              required(23, "koala", Types.IntegerType.get()),
+              required(24, "couch rope", Types.IntegerType.get())
+          ))),
+      optional(2, "slide", Types.StringType.get())
+  );
+
   private static Iterable<Row> generateData(Schema schema, int numRecords, 
Supplier<RandomRowGenerator> supplier) {
     return () -> new Iterator<Row>() {
       private final RandomRowGenerator generator = supplier.get();
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
new file mode 100644
index 0000000..06bf04b
--- /dev/null
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
+
+public class TestFlinkAvroReaderWriter {
+  private static final int NUM_RECORDS = 20_000;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private void testCorrectness(Schema schema, int numRecords, Iterable<Row> 
iterable) throws IOException {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Row> writer = Avro.write(Files.localOutput(testFile))
+        .schema(schema)
+        .createWriterFunc(FlinkAvroWriter::new)
+        .build()) {
+      writer.addAll(iterable);
+    }
+
+    try (CloseableIterable<Row> reader = Avro.read(Files.localInput(testFile))
+        .project(schema)
+        .createReaderFunc(FlinkAvroReader::new)
+        .build()) {
+      Iterator<Row> expected = iterable.iterator();
+      Iterator<Row> rows = reader.iterator();
+      for (int i = 0; i < numRecords; i += 1) {
+        Assert.assertTrue("Should have expected number of rows", 
rows.hasNext());
+        Assert.assertEquals(expected.next(), rows.next());
+      }
+      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+    }
+  }
+
+  @Test
+  public void testNormalData() throws IOException {
+    testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, 
RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19982));
+  }
+}
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
index e0cb3f0..41ea960 100644
--- 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
@@ -28,14 +28,12 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
 
 public class TestFlinkParquetReaderWriter {
   private static final int NUM_RECORDS = 20_000;
@@ -43,33 +41,6 @@ public class TestFlinkParquetReaderWriter {
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private static final Schema COMPLEX_SCHEMA = new Schema(
-      required(1, "roots", Types.LongType.get()),
-      optional(3, "lime", Types.ListType.ofRequired(4, 
Types.DoubleType.get())),
-      required(5, "strict", Types.StructType.of(
-          required(9, "tangerine", Types.StringType.get()),
-          optional(6, "hopeful", Types.StructType.of(
-              required(7, "steel", Types.FloatType.get()),
-              required(8, "lantern", Types.DateType.get())
-          )),
-          optional(10, "vehement", Types.LongType.get())
-      )),
-      optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13,
-          Types.StringType.get(), Types.TimestampType.withZone())),
-      required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of(
-          optional(16, "beet", Types.DoubleType.get()),
-          required(17, "stamp", Types.FloatType.get()),
-          optional(18, "wheeze", Types.StringType.get())
-      ))),
-      optional(19, "renovate", Types.MapType.ofRequired(20, 21,
-          Types.StringType.get(), Types.StructType.of(
-              optional(22, "jumpy", Types.DoubleType.get()),
-              required(23, "koala", Types.IntegerType.get()),
-              required(24, "couch rope", Types.IntegerType.get())
-          ))),
-      optional(2, "slide", Types.StringType.get())
-  );
-
   private void testCorrectness(Schema schema, int numRecords, Iterable<Row> 
iterable) throws IOException {
     File testFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", testFile.delete());
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
new file mode 100644
index 0000000..23b4373
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRowProjection {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private Row writeAndRead(String desc, Schema writeSchema, Schema readSchema, 
Row row) throws IOException {
+    File file = temp.newFile(desc + ".avro");
+    Assert.assertTrue(file.delete());
+
+    try (FileAppender<Row> appender = Avro.write(Files.localOutput(file))
+        .schema(writeSchema)
+        .createWriterFunc(FlinkAvroWriter::new)
+        .build()) {
+      appender.add(row);
+    }
+
+    Iterable<Row> records = Avro.read(Files.localInput(file))
+        .project(readSchema)
+        .createReaderFunc(FlinkAvroReader::new)
+        .build();
+
+    return Iterables.getOnlyElement(records);
+  }
+
+  @Test
+  public void testFullProjection() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Row projected = writeAndRead("full_projection", schema, schema, row);
+
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+
+    int cmp = Comparators.charSequences()
+        .compare("test", (CharSequence) projected.getField(1));
+    Assert.assertEquals("Should contain the correct data value", cmp, 0);
+  }
+
+  @Test
+  public void testSpecialCharacterProjection() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "user id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data%0", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Row full = writeAndRead("special_chars", schema, schema, row);
+
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
row.getField(0));
+    Assert.assertEquals("Should contain the correct data value",
+        0,
+        Comparators.charSequences().compare("test", (CharSequence) 
row.getField(1)));
+
+    Row projected = writeAndRead("special_characters", schema, 
schema.select("data%0"), row);
+
+    Assert.assertEquals("Should not contain id value", 1, 
projected.getArity());
+    Assert.assertEquals("Should contain the correct data value",
+        0,
+        Comparators.charSequences().compare("test", (CharSequence) 
projected.getField(0)));
+  }
+
+  @Test
+  public void testReorderedFullProjection() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Schema reordered = new Schema(
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("full_projection", schema, reordered, row);
+
+    Assert.assertEquals("Should contain the correct 0 value", "test", 
projected.getField(0).toString());
+    Assert.assertEquals("Should contain the correct 1 value", 34L, 
projected.getField(1));
+  }
+
+  @Test
+  public void testReorderedProjection() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Schema reordered = new Schema(
+        Types.NestedField.optional(2, "missing_1", Types.StringType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "missing_2", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("full_projection", schema, reordered, row);
+
+    Assert.assertNull("Should contain the correct 0 value", 
projected.getField(0));
+    Assert.assertEquals("Should contain the correct 1 value", "test", 
projected.getField(1).toString());
+    Assert.assertNull("Should contain the correct 2 value", 
projected.getField(2));
+  }
+
+  @Test
+  public void testRenamedAddedField() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(1, "a", Types.LongType.get()),
+        Types.NestedField.required(2, "b", Types.LongType.get()),
+        Types.NestedField.required(3, "d", Types.LongType.get())
+    );
+
+    Row row = Row.of(100L, 200L, 300L);
+
+    Schema renamedAdded = new Schema(
+        Types.NestedField.optional(1, "a", Types.LongType.get()),
+        Types.NestedField.optional(2, "b", Types.LongType.get()),
+        Types.NestedField.optional(3, "c", Types.LongType.get()),
+        Types.NestedField.optional(4, "d", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("rename_and_add_column_projection", schema, 
renamedAdded, row);
+    Assert.assertEquals("Should contain the correct value in column 1", 
projected.getField(0), 100L);
+    Assert.assertEquals("Should contain the correct value in column 2", 
projected.getField(1), 200L);
+    Assert.assertEquals("Should contain the correct value in column 3", 
projected.getField(2), 300L);
+    Assert.assertNull("Should contain empty value on new column 4", 
projected.getField(3));
+  }
+
+  @Test
+  public void testEmptyProjection() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Row projected = writeAndRead("empty_projection", schema, schema.select(), 
row);
+
+    Assert.assertNotNull("Should read a non-null record", projected);
+    try {
+      projected.getField(0);
+      Assert.fail("Should not retrieve value with ordinal 0");
+    } catch (ArrayIndexOutOfBoundsException e) {
+      // this is expected because there are no values
+    }
+  }
+
+  @Test
+  public void testBasicProjection() throws Exception {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("basic_projection_id", writeSchema, idOnly, 
row);
+    Assert.assertEquals("Should not project data", 1, projected.getArity());
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+
+    Schema dataOnly = new Schema(
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    projected = writeAndRead("basic_projection_data", writeSchema, dataOnly, 
row);
+
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    int cmp = Comparators.charSequences().compare("test", (CharSequence) 
projected.getField(0));
+    Assert.assertEquals("Should contain the correct data value", 0, cmp);
+  }
+
+  @Test
+  public void testRename() throws Exception {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get())
+    );
+
+    Row row = Row.of(34L, "test");
+
+    Schema readSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "renamed", Types.StringType.get())
+    );
+
+    Row projected = writeAndRead("project_and_rename", writeSchema, 
readSchema, row);
+
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+    int cmp = Comparators.charSequences().compare("test", (CharSequence) 
projected.getField(1));
+    Assert.assertEquals("Should contain the correct data/renamed value", 0, 
cmp);
+  }
+
+  @Test
+  public void testNestedStructProjection() throws Exception {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(3, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get()),
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    Row location = Row.of(52.995143f, -1.539054f);
+    Row record = Row.of(34L, location);
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("id_only", writeSchema, idOnly, record);
+    Assert.assertEquals("Should not project location", 1, 
projected.getArity());
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+
+    Schema latOnly = new Schema(
+        Types.NestedField.optional(3, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get())
+        ))
+    );
+
+    projected = writeAndRead("latitude_only", writeSchema, latOnly, record);
+    Row projectedLocation = (Row) projected.getField(0);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project location", projected.getField(0));
+    Assert.assertEquals("Should not project longitude", 1, 
projectedLocation.getArity());
+    Assert.assertEquals("Should project latitude",
+        52.995143f, (float) projectedLocation.getField(0), 0.000001f);
+
+    Schema longOnly = new Schema(
+        Types.NestedField.optional(3, "location", Types.StructType.of(
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    projected = writeAndRead("longitude_only", writeSchema, longOnly, record);
+    projectedLocation = (Row) projected.getField(0);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project location", projected.getField(0));
+    Assert.assertEquals("Should not project latitutde", 1, 
projectedLocation.getArity());
+    Assert.assertEquals("Should project longitude",
+        -1.539054f, (float) projectedLocation.getField(0), 0.000001f);
+
+    Schema locationOnly = writeSchema.select("location");
+    projected = writeAndRead("location_only", writeSchema, locationOnly, 
record);
+    projectedLocation = (Row) projected.getField(0);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project location", projected.getField(0));
+    Assert.assertEquals("Should project latitude",
+        52.995143f, (float) projectedLocation.getField(0), 0.000001f);
+    Assert.assertEquals("Should project longitude",
+        -1.539054f, (float) projectedLocation.getField(1), 0.000001f);
+  }
+
+  @Test
+  public void testMapProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "properties",
+            Types.MapType.ofOptional(6, 7, Types.StringType.get(), 
Types.StringType.get()))
+    );
+
+    Map<String, String> properties = ImmutableMap.of("a", "A", "b", "B");
+
+    Row row = Row.of(34L, properties);
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+    Assert.assertEquals("Should not project properties map", 1, 
projected.getArity());
+
+    Schema keyOnly = writeSchema.select("properties.key");
+    projected = writeAndRead("key_only", writeSchema, keyOnly, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project entire map", properties, 
toStringMap((Map) projected.getField(0)));
+
+    Schema valueOnly = writeSchema.select("properties.value");
+    projected = writeAndRead("value_only", writeSchema, valueOnly, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project entire map",
+        properties, toStringMap((Map) projected.getField(0)));
+
+    Schema mapOnly = writeSchema.select("properties");
+    projected = writeAndRead("map_only", writeSchema, mapOnly, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project entire map", properties, 
toStringMap((Map) projected.getField(0)));
+  }
+
+  private Map<String, ?> toStringMap(Map<?, ?> map) {
+    Map<String, Object> stringMap = Maps.newHashMap();
+    for (Map.Entry<?, ?> entry : map.entrySet()) {
+      if (entry.getValue() instanceof CharSequence) {
+        stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+      } else {
+        stringMap.put(entry.getKey().toString(), entry.getValue());
+      }
+    }
+    return stringMap;
+  }
+
+  @Test
+  public void testMapOfStructsProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 
7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "lat", Types.FloatType.get()),
+                Types.NestedField.required(2, "long", Types.FloatType.get())
+            )
+        ))
+    );
+
+    Row l1 = Row.of(53.992811f, -1.542616f);
+    Row l2 = Row.of(52.995143f, -1.539054f);
+    Row row = Row.of(34L, ImmutableMap.of("L1", l1, "L2", l2));
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+    Assert.assertEquals("Should not project locations map", 1, 
projected.getArity());
+
+    projected = writeAndRead("all_locations", writeSchema, 
writeSchema.select("locations"), row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project locations map",
+        row.getField(1), toStringMap((Map) projected.getField(0)));
+
+    projected = writeAndRead("lat_only", writeSchema, 
writeSchema.select("locations.lat"), row);
+    Map<String, ?> locations = toStringMap((Map) projected.getField(0));
+    Assert.assertNotNull("Should project locations map", locations);
+    Assert.assertEquals("Should contain L1 and L2",
+        Sets.newHashSet("L1", "L2"), locations.keySet());
+    Row projectedL1 = (Row) locations.get("L1");
+    Assert.assertNotNull("L1 should not be null", projectedL1);
+    Assert.assertEquals("L1 should contain lat",
+        53.992811f, (float) projectedL1.getField(0), 0.000001);
+    Assert.assertEquals("L1 should not contain long", 1, 
projectedL1.getArity());
+    Row projectedL2 = (Row) locations.get("L2");
+    Assert.assertNotNull("L2 should not be null", projectedL2);
+    Assert.assertEquals("L2 should contain lat",
+        52.995143f, (float) projectedL2.getField(0), 0.000001);
+    Assert.assertEquals("L2 should not contain long", 1, 
projectedL2.getArity());
+
+    projected = writeAndRead("long_only",
+        writeSchema, writeSchema.select("locations.long"), row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    locations = toStringMap((Map) projected.getField(0));
+    Assert.assertNotNull("Should project locations map", locations);
+    Assert.assertEquals("Should contain L1 and L2",
+        Sets.newHashSet("L1", "L2"), locations.keySet());
+    projectedL1 = (Row) locations.get("L1");
+    Assert.assertNotNull("L1 should not be null", projectedL1);
+    Assert.assertEquals("L1 should not contain lat", 1, 
projectedL1.getArity());
+    Assert.assertEquals("L1 should contain long",
+        -1.542616f, (float) projectedL1.getField(0), 0.000001);
+    projectedL2 = (Row) locations.get("L2");
+    Assert.assertNotNull("L2 should not be null", projectedL2);
+    Assert.assertEquals("L2 should not contain lat", 1, 
projectedL2.getArity());
+    Assert.assertEquals("L2 should contain long",
+        -1.539054f, (float) projectedL2.getField(0), 0.000001);
+
+    Schema latitiudeRenamed = new Schema(
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 
7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "latitude", 
Types.FloatType.get())
+            )
+        ))
+    );
+
+    projected = writeAndRead("latitude_renamed", writeSchema, 
latitiudeRenamed, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    locations = toStringMap((Map) projected.getField(0));
+    Assert.assertNotNull("Should project locations map", locations);
+    Assert.assertEquals("Should contain L1 and L2",
+        Sets.newHashSet("L1", "L2"), locations.keySet());
+    projectedL1 = (Row) locations.get("L1");
+    Assert.assertNotNull("L1 should not be null", projectedL1);
+    Assert.assertEquals("L1 should contain latitude",
+        53.992811f, (float) projectedL1.getField(0), 0.000001);
+    projectedL2 = (Row) locations.get("L2");
+    Assert.assertNotNull("L2 should not be null", projectedL2);
+    Assert.assertEquals("L2 should contain latitude",
+        52.995143f, (float) projectedL2.getField(0), 0.000001);
+  }
+
+  @Test
+  public void testListProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(10, "values",
+            Types.ListType.ofOptional(11, Types.LongType.get()))
+    );
+
+    List<Long> values = ImmutableList.of(56L, 57L, 58L);
+
+    Row row = Row.of(34L, values);
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+    Assert.assertEquals("Should not project values list", 1, 
projected.getArity());
+
+    Schema elementOnly = writeSchema.select("values.element");
+    projected = writeAndRead("element_only", writeSchema, elementOnly, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project entire list", values, 
projected.getField(0));
+
+    Schema listOnly = writeSchema.select("values");
+    projected = writeAndRead("list_only", writeSchema, listOnly, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project entire list", values, 
projected.getField(0));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testListOfStructsProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(22, "points",
+            Types.ListType.ofOptional(21, Types.StructType.of(
+                Types.NestedField.required(19, "x", Types.IntegerType.get()),
+                Types.NestedField.optional(18, "y", Types.IntegerType.get())
+            ))
+        )
+    );
+
+    Row p1 = Row.of(1, 2);
+    Row p2 = Row.of(3, null);
+    Row row = Row.of(34L, ImmutableList.of(p1, p2));
+
+    Schema idOnly = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+    Assert.assertEquals("Should contain the correct id value", 34L, (long) 
projected.getField(0));
+    Assert.assertEquals("Should not project points list", 1, 
projected.getArity());
+
+    projected = writeAndRead("all_points", writeSchema, 
writeSchema.select("points"), row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertEquals("Should project points list", row.getField(1), 
projected.getField(0));
+
+    projected = writeAndRead("x_only", writeSchema, 
writeSchema.select("points.x"), row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project points list", projected.getField(0));
+    List<Row> points = (List<Row>) projected.getField(0);
+    Assert.assertEquals("Should read 2 points", 2, points.size());
+    Row projectedP1 = points.get(0);
+    Assert.assertEquals("Should project x", 1, (int) projectedP1.getField(0));
+    Assert.assertEquals("Should not project y", 1, projectedP1.getArity());
+    Row projectedP2 = points.get(1);
+    Assert.assertEquals("Should not project y", 1, projectedP2.getArity());
+    Assert.assertEquals("Should project x", 3, (int) projectedP2.getField(0));
+
+    projected = writeAndRead("y_only", writeSchema, 
writeSchema.select("points.y"), row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project points list", projected.getField(0));
+    points = (List<Row>) projected.getField(0);
+    Assert.assertEquals("Should read 2 points", 2, points.size());
+    projectedP1 = points.get(0);
+    Assert.assertEquals("Should not project x", 1, projectedP1.getArity());
+    Assert.assertEquals("Should project y", 2, (int) projectedP1.getField(0));
+    projectedP2 = points.get(1);
+    Assert.assertEquals("Should not project x", 1, projectedP2.getArity());
+    Assert.assertNull("Should project null y", projectedP2.getField(0));
+
+    Schema yRenamed = new Schema(
+        Types.NestedField.optional(22, "points",
+            Types.ListType.ofOptional(21, Types.StructType.of(
+                Types.NestedField.optional(18, "z", Types.IntegerType.get())
+            ))
+        )
+    );
+
+    projected = writeAndRead("y_renamed", writeSchema, yRenamed, row);
+    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    Assert.assertNotNull("Should project points list", projected.getField(0));
+    points = (List<Row>) projected.getField(0);
+    Assert.assertEquals("Should read 2 points", 2, points.size());
+    projectedP1 = points.get(0);
+    Assert.assertEquals("Should not project x and y", 1, 
projectedP1.getArity());
+    Assert.assertEquals("Should project z", 2, (int) projectedP1.getField(0));
+    projectedP2 = points.get(1);
+    Assert.assertEquals("Should not project x and y", 1, 
projectedP2.getArity());
+    Assert.assertNull("Should project null z", projectedP2.getField(0));
+  }
+
+  @Test
+  public void testAddedFieldsWithRequiredChildren() throws Exception {
+    Schema schema = new Schema(
+        Types.NestedField.required(1, "a", Types.LongType.get())
+    );
+
+    Row row = Row.of(100L);
+
+    Schema addedFields = new Schema(
+        Types.NestedField.optional(1, "a", Types.LongType.get()),
+        Types.NestedField.optional(2, "b", Types.StructType.of(
+            Types.NestedField.required(3, "c", Types.LongType.get())
+        )),
+        Types.NestedField.optional(4, "d", Types.ListType.ofRequired(5, 
Types.LongType.get())),
+        Types.NestedField.optional(6, "e", Types.MapType.ofRequired(7, 8, 
Types.LongType.get(), Types.LongType.get()))
+    );
+
+    Row projected = 
writeAndRead("add_fields_with_required_children_projection", schema, 
addedFields, row);
+    Assert.assertEquals("Should contain the correct value in column 1", 
projected.getField(0), 100L);
+    Assert.assertNull("Should contain empty value in new column 2", 
projected.getField(1));
+    Assert.assertNull("Should contain empty value in new column 4", 
projected.getField(2));
+    Assert.assertNull("Should contain empty value in new column 6", 
projected.getField(3));
+  }
+}

Reply via email to