This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bc7bd0e Flink: Add Avro value reader, writer implementations (#1153)
bc7bd0e is described below
commit bc7bd0e30839740d0d9d7430cac984c10659ca38
Author: openinx <[email protected]>
AuthorDate: Sat Jul 4 00:13:54 2020 +0800
Flink: Add Avro value reader, writer implementations (#1153)
---
.../org/apache/iceberg/data/avro/DataReader.java | 11 +-
.../org/apache/iceberg/data/avro/DataWriter.java | 11 +-
.../apache/iceberg/flink/data/FlinkAvroReader.java | 72 +++
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 49 ++
.../org/apache/iceberg/flink/data/RandomData.java | 30 ++
.../flink/data/TestFlinkAvroReaderWriter.java | 73 +++
.../flink/data/TestFlinkParquetReaderWriter.java | 31 +-
.../iceberg/flink/data/TestRowProjection.java | 570 +++++++++++++++++++++
8 files changed, 809 insertions(+), 38 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index 8c7d0f3..c13bd25 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -59,7 +59,7 @@ public class DataReader<T> implements DatumReader<T> {
private Schema fileSchema = null;
@SuppressWarnings("unchecked")
- private DataReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema, Map<Integer, ?> idToConstant) {
+ protected DataReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema, Map<Integer, ?> idToConstant) {
this.readSchema = readSchema;
this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor
.visit(expectedSchema, readSchema, new ReadBuilder(idToConstant));
@@ -102,7 +102,12 @@ public class DataReader<T> implements DatumReader<T> {
}
}
- private static class ReadBuilder extends
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+ protected ValueReader<?> createStructReader(Types.StructType struct,
+ List<ValueReader<?>> fields,
Map<Integer, ?> idToConstant) {
+ return GenericReaders.struct(struct, fields, idToConstant);
+ }
+
+ private class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;
private ReadBuilder(Map<Integer, ?> idToConstant) {
@@ -112,7 +117,7 @@ public class DataReader<T> implements DatumReader<T> {
@Override
public ValueReader<?> record(Types.StructType struct, Schema record,
List<String> names, List<ValueReader<?>>
fields) {
- return GenericReaders.struct(struct, fields, idToConstant);
+ return createStructReader(struct, fields, idToConstant);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
index fc47dd0..aa6cc6f 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
@@ -40,7 +40,7 @@ public class DataWriter<T> implements DatumWriter<T> {
return new DataWriter<>(schema);
}
- private DataWriter(Schema schema) {
+ protected DataWriter(Schema schema) {
setSchema(schema);
}
@@ -55,13 +55,14 @@ public class DataWriter<T> implements DatumWriter<T> {
writer.write(datum, out);
}
- private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
- private WriteBuilder() {
- }
+ protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
+ return GenericWriters.struct(fields);
+ }
+ private class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
@Override
public ValueWriter<?> record(Schema record, List<String> names,
List<ValueWriter<?>> fields) {
- return GenericWriters.struct(fields);
+ return createStructWriter(fields);
}
@Override
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
new file mode 100644
index 0000000..9f1c97f
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+
+public class FlinkAvroReader extends DataReader<Row> {
+
+ public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema) {
+ super(expectedSchema, readSchema, ImmutableMap.of());
+ }
+
+ @Override
+ protected ValueReader<?> createStructReader(Types.StructType struct,
+ List<ValueReader<?>> fields,
+ Map<Integer, ?> idToConstant) {
+ return new RowReader(fields, struct, idToConstant);
+ }
+
+ private static class RowReader extends ValueReaders.StructReader<Row> {
+ private final Types.StructType structType;
+
+ private RowReader(List<ValueReader<?>> readers, Types.StructType struct,
Map<Integer, ?> idToConstant) {
+ super(readers, struct, idToConstant);
+ this.structType = struct;
+ }
+
+ @Override
+ protected Row reuseOrCreate(Object reuse) {
+ if (reuse instanceof Row) {
+ return (Row) reuse;
+ } else {
+ return new Row(structType.fields().size());
+ }
+ }
+
+ @Override
+ protected Object get(Row row, int pos) {
+ return row.getField(pos);
+ }
+
+ @Override
+ protected void set(Row row, int pos, Object value) {
+ row.setField(pos, value);
+ }
+ }
+}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
new file mode 100644
index 0000000..fdc0838
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.avro.ValueWriter;
+import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.data.avro.DataWriter;
+
+public class FlinkAvroWriter extends DataWriter<Row> {
+ public FlinkAvroWriter(Schema schema) {
+ super(schema);
+ }
+
+ @Override
+ protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
+ return new RowWriter(fields);
+ }
+
+ private static class RowWriter extends ValueWriters.StructWriter<Row> {
+ private RowWriter(List<ValueWriter<?>> writers) {
+ super(writers);
+ }
+
+ @Override
+ protected Object get(Row struct, int pos) {
+ return struct.getField(pos);
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
index cdf667b..4cd1efc 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
@@ -33,10 +33,40 @@ import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.RandomUtil;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
class RandomData {
private RandomData() {
}
+ static final Schema COMPLEX_SCHEMA = new Schema(
+ required(1, "roots", Types.LongType.get()),
+ optional(3, "lime", Types.ListType.ofRequired(4,
Types.DoubleType.get())),
+ required(5, "strict", Types.StructType.of(
+ required(9, "tangerine", Types.StringType.get()),
+ optional(6, "hopeful", Types.StructType.of(
+ required(7, "steel", Types.FloatType.get()),
+ required(8, "lantern", Types.DateType.get())
+ )),
+ optional(10, "vehement", Types.LongType.get())
+ )),
+ optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13,
+ Types.StringType.get(), Types.TimestampType.withZone())),
+ required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of(
+ optional(16, "beet", Types.DoubleType.get()),
+ required(17, "stamp", Types.FloatType.get()),
+ optional(18, "wheeze", Types.StringType.get())
+ ))),
+ optional(19, "renovate", Types.MapType.ofRequired(20, 21,
+ Types.StringType.get(), Types.StructType.of(
+ optional(22, "jumpy", Types.DoubleType.get()),
+ required(23, "koala", Types.IntegerType.get()),
+ required(24, "couch rope", Types.IntegerType.get())
+ ))),
+ optional(2, "slide", Types.StringType.get())
+ );
+
private static Iterable<Row> generateData(Schema schema, int numRecords,
Supplier<RandomRowGenerator> supplier) {
return () -> new Iterator<Row>() {
private final RandomRowGenerator generator = supplier.get();
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
new file mode 100644
index 0000000..06bf04b
--- /dev/null
+++
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
+
+public class TestFlinkAvroReaderWriter {
+ private static final int NUM_RECORDS = 20_000;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private void testCorrectness(Schema schema, int numRecords, Iterable<Row>
iterable) throws IOException {
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Row> writer = Avro.write(Files.localOutput(testFile))
+ .schema(schema)
+ .createWriterFunc(FlinkAvroWriter::new)
+ .build()) {
+ writer.addAll(iterable);
+ }
+
+ try (CloseableIterable<Row> reader = Avro.read(Files.localInput(testFile))
+ .project(schema)
+ .createReaderFunc(FlinkAvroReader::new)
+ .build()) {
+ Iterator<Row> expected = iterable.iterator();
+ Iterator<Row> rows = reader.iterator();
+ for (int i = 0; i < numRecords; i += 1) {
+ Assert.assertTrue("Should have expected number of rows",
rows.hasNext());
+ Assert.assertEquals(expected.next(), rows.next());
+ }
+ Assert.assertFalse("Should not have extra rows", rows.hasNext());
+ }
+ }
+
+ @Test
+ public void testNormalData() throws IOException {
+ testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS,
RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19982));
+ }
+}
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
index e0cb3f0..41ea960 100644
---
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
+++
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
@@ -28,14 +28,12 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
public class TestFlinkParquetReaderWriter {
private static final int NUM_RECORDS = 20_000;
@@ -43,33 +41,6 @@ public class TestFlinkParquetReaderWriter {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private static final Schema COMPLEX_SCHEMA = new Schema(
- required(1, "roots", Types.LongType.get()),
- optional(3, "lime", Types.ListType.ofRequired(4,
Types.DoubleType.get())),
- required(5, "strict", Types.StructType.of(
- required(9, "tangerine", Types.StringType.get()),
- optional(6, "hopeful", Types.StructType.of(
- required(7, "steel", Types.FloatType.get()),
- required(8, "lantern", Types.DateType.get())
- )),
- optional(10, "vehement", Types.LongType.get())
- )),
- optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13,
- Types.StringType.get(), Types.TimestampType.withZone())),
- required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of(
- optional(16, "beet", Types.DoubleType.get()),
- required(17, "stamp", Types.FloatType.get()),
- optional(18, "wheeze", Types.StringType.get())
- ))),
- optional(19, "renovate", Types.MapType.ofRequired(20, 21,
- Types.StringType.get(), Types.StructType.of(
- optional(22, "jumpy", Types.DoubleType.get()),
- required(23, "koala", Types.IntegerType.get()),
- required(24, "couch rope", Types.IntegerType.get())
- ))),
- optional(2, "slide", Types.StringType.get())
- );
-
private void testCorrectness(Schema schema, int numRecords, Iterable<Row>
iterable) throws IOException {
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
new file mode 100644
index 0000000..23b4373
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRowProjection {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private Row writeAndRead(String desc, Schema writeSchema, Schema readSchema,
Row row) throws IOException {
+ File file = temp.newFile(desc + ".avro");
+ Assert.assertTrue(file.delete());
+
+ try (FileAppender<Row> appender = Avro.write(Files.localOutput(file))
+ .schema(writeSchema)
+ .createWriterFunc(FlinkAvroWriter::new)
+ .build()) {
+ appender.add(row);
+ }
+
+ Iterable<Row> records = Avro.read(Files.localInput(file))
+ .project(readSchema)
+ .createReaderFunc(FlinkAvroReader::new)
+ .build();
+
+ return Iterables.getOnlyElement(records);
+ }
+
+ @Test
+ public void testFullProjection() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Row projected = writeAndRead("full_projection", schema, schema, row);
+
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+
+ int cmp = Comparators.charSequences()
+ .compare("test", (CharSequence) projected.getField(1));
+ Assert.assertEquals("Should contain the correct data value", cmp, 0);
+ }
+
+ @Test
+ public void testSpecialCharacterProjection() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "user id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data%0", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Row full = writeAndRead("special_chars", schema, schema, row);
+
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
row.getField(0));
+ Assert.assertEquals("Should contain the correct data value",
+ 0,
+ Comparators.charSequences().compare("test", (CharSequence)
row.getField(1)));
+
+ Row projected = writeAndRead("special_characters", schema,
schema.select("data%0"), row);
+
+ Assert.assertEquals("Should not contain id value", 1,
projected.getArity());
+ Assert.assertEquals("Should contain the correct data value",
+ 0,
+ Comparators.charSequences().compare("test", (CharSequence)
projected.getField(0)));
+ }
+
+ @Test
+ public void testReorderedFullProjection() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Schema reordered = new Schema(
+ Types.NestedField.optional(1, "data", Types.StringType.get()),
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("full_projection", schema, reordered, row);
+
+ Assert.assertEquals("Should contain the correct 0 value", "test",
projected.getField(0).toString());
+ Assert.assertEquals("Should contain the correct 1 value", 34L,
projected.getField(1));
+ }
+
+ @Test
+ public void testReorderedProjection() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Schema reordered = new Schema(
+ Types.NestedField.optional(2, "missing_1", Types.StringType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "missing_2", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("full_projection", schema, reordered, row);
+
+ Assert.assertNull("Should contain the correct 0 value",
projected.getField(0));
+ Assert.assertEquals("Should contain the correct 1 value", "test",
projected.getField(1).toString());
+ Assert.assertNull("Should contain the correct 2 value",
projected.getField(2));
+ }
+
+ @Test
+ public void testRenamedAddedField() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(1, "a", Types.LongType.get()),
+ Types.NestedField.required(2, "b", Types.LongType.get()),
+ Types.NestedField.required(3, "d", Types.LongType.get())
+ );
+
+ Row row = Row.of(100L, 200L, 300L);
+
+ Schema renamedAdded = new Schema(
+ Types.NestedField.optional(1, "a", Types.LongType.get()),
+ Types.NestedField.optional(2, "b", Types.LongType.get()),
+ Types.NestedField.optional(3, "c", Types.LongType.get()),
+ Types.NestedField.optional(4, "d", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("rename_and_add_column_projection", schema,
renamedAdded, row);
+ Assert.assertEquals("Should contain the correct value in column 1",
projected.getField(0), 100L);
+ Assert.assertEquals("Should contain the correct value in column 2",
projected.getField(1), 200L);
+ Assert.assertEquals("Should contain the correct value in column 3",
projected.getField(2), 300L);
+ Assert.assertNull("Should contain empty value on new column 4",
projected.getField(3));
+ }
+
+ @Test
+ public void testEmptyProjection() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Row projected = writeAndRead("empty_projection", schema, schema.select(),
row);
+
+ Assert.assertNotNull("Should read a non-null record", projected);
+ try {
+ projected.getField(0);
+ Assert.fail("Should not retrieve value with ordinal 0");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ // this is expected because there are no values
+ }
+ }
+
+ @Test
+ public void testBasicProjection() throws Exception {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("basic_projection_id", writeSchema, idOnly,
row);
+ Assert.assertEquals("Should not project data", 1, projected.getArity());
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+
+ Schema dataOnly = new Schema(
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ projected = writeAndRead("basic_projection_data", writeSchema, dataOnly,
row);
+
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ int cmp = Comparators.charSequences().compare("test", (CharSequence)
projected.getField(0));
+ Assert.assertEquals("Should contain the correct data value", 0, cmp);
+ }
+
+ @Test
+ public void testRename() throws Exception {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "data", Types.StringType.get())
+ );
+
+ Row row = Row.of(34L, "test");
+
+ Schema readSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(1, "renamed", Types.StringType.get())
+ );
+
+ Row projected = writeAndRead("project_and_rename", writeSchema,
readSchema, row);
+
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+ int cmp = Comparators.charSequences().compare("test", (CharSequence)
projected.getField(1));
+ Assert.assertEquals("Should contain the correct data/renamed value", 0,
cmp);
+ }
+
+ @Test
+ public void testNestedStructProjection() throws Exception {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(3, "location", Types.StructType.of(
+ Types.NestedField.required(1, "lat", Types.FloatType.get()),
+ Types.NestedField.required(2, "long", Types.FloatType.get())
+ ))
+ );
+
+ Row location = Row.of(52.995143f, -1.539054f);
+ Row record = Row.of(34L, location);
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("id_only", writeSchema, idOnly, record);
+ Assert.assertEquals("Should not project location", 1,
projected.getArity());
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+
+ Schema latOnly = new Schema(
+ Types.NestedField.optional(3, "location", Types.StructType.of(
+ Types.NestedField.required(1, "lat", Types.FloatType.get())
+ ))
+ );
+
+ projected = writeAndRead("latitude_only", writeSchema, latOnly, record);
+ Row projectedLocation = (Row) projected.getField(0);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project location", projected.getField(0));
+ Assert.assertEquals("Should not project longitude", 1,
projectedLocation.getArity());
+ Assert.assertEquals("Should project latitude",
+ 52.995143f, (float) projectedLocation.getField(0), 0.000001f);
+
+ Schema longOnly = new Schema(
+ Types.NestedField.optional(3, "location", Types.StructType.of(
+ Types.NestedField.required(2, "long", Types.FloatType.get())
+ ))
+ );
+
+ projected = writeAndRead("longitude_only", writeSchema, longOnly, record);
+ projectedLocation = (Row) projected.getField(0);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project location", projected.getField(0));
+ Assert.assertEquals("Should not project latitutde", 1,
projectedLocation.getArity());
+ Assert.assertEquals("Should project longitude",
+ -1.539054f, (float) projectedLocation.getField(0), 0.000001f);
+
+ Schema locationOnly = writeSchema.select("location");
+ projected = writeAndRead("location_only", writeSchema, locationOnly,
record);
+ projectedLocation = (Row) projected.getField(0);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project location", projected.getField(0));
+ Assert.assertEquals("Should project latitude",
+ 52.995143f, (float) projectedLocation.getField(0), 0.000001f);
+ Assert.assertEquals("Should project longitude",
+ -1.539054f, (float) projectedLocation.getField(1), 0.000001f);
+ }
+
+ @Test
+ public void testMapProjection() throws IOException {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(5, "properties",
+ Types.MapType.ofOptional(6, 7, Types.StringType.get(),
Types.StringType.get()))
+ );
+
+ Map<String, String> properties = ImmutableMap.of("a", "A", "b", "B");
+
+ Row row = Row.of(34L, properties);
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+ Assert.assertEquals("Should not project properties map", 1,
projected.getArity());
+
+ Schema keyOnly = writeSchema.select("properties.key");
+ projected = writeAndRead("key_only", writeSchema, keyOnly, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project entire map", properties,
toStringMap((Map) projected.getField(0)));
+
+ Schema valueOnly = writeSchema.select("properties.value");
+ projected = writeAndRead("value_only", writeSchema, valueOnly, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project entire map",
+ properties, toStringMap((Map) projected.getField(0)));
+
+ Schema mapOnly = writeSchema.select("properties");
+ projected = writeAndRead("map_only", writeSchema, mapOnly, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project entire map", properties,
toStringMap((Map) projected.getField(0)));
+ }
+
+ private Map<String, ?> toStringMap(Map<?, ?> map) {
+ Map<String, Object> stringMap = Maps.newHashMap();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ if (entry.getValue() instanceof CharSequence) {
+ stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+ } else {
+ stringMap.put(entry.getKey().toString(), entry.getValue());
+ }
+ }
+ return stringMap;
+ }
+
+ @Test
+ public void testMapOfStructsProjection() throws IOException {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6,
7,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Types.NestedField.required(1, "lat", Types.FloatType.get()),
+ Types.NestedField.required(2, "long", Types.FloatType.get())
+ )
+ ))
+ );
+
+ Row l1 = Row.of(53.992811f, -1.542616f);
+ Row l2 = Row.of(52.995143f, -1.539054f);
+ Row row = Row.of(34L, ImmutableMap.of("L1", l1, "L2", l2));
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+ Assert.assertEquals("Should not project locations map", 1,
projected.getArity());
+
+ projected = writeAndRead("all_locations", writeSchema,
writeSchema.select("locations"), row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project locations map",
+ row.getField(1), toStringMap((Map) projected.getField(0)));
+
+ projected = writeAndRead("lat_only", writeSchema,
writeSchema.select("locations.lat"), row);
+ Map<String, ?> locations = toStringMap((Map) projected.getField(0));
+ Assert.assertNotNull("Should project locations map", locations);
+ Assert.assertEquals("Should contain L1 and L2",
+ Sets.newHashSet("L1", "L2"), locations.keySet());
+ Row projectedL1 = (Row) locations.get("L1");
+ Assert.assertNotNull("L1 should not be null", projectedL1);
+ Assert.assertEquals("L1 should contain lat",
+ 53.992811f, (float) projectedL1.getField(0), 0.000001);
+ Assert.assertEquals("L1 should not contain long", 1,
projectedL1.getArity());
+ Row projectedL2 = (Row) locations.get("L2");
+ Assert.assertNotNull("L2 should not be null", projectedL2);
+ Assert.assertEquals("L2 should contain lat",
+ 52.995143f, (float) projectedL2.getField(0), 0.000001);
+ Assert.assertEquals("L2 should not contain long", 1,
projectedL2.getArity());
+
+ projected = writeAndRead("long_only",
+ writeSchema, writeSchema.select("locations.long"), row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ locations = toStringMap((Map) projected.getField(0));
+ Assert.assertNotNull("Should project locations map", locations);
+ Assert.assertEquals("Should contain L1 and L2",
+ Sets.newHashSet("L1", "L2"), locations.keySet());
+ projectedL1 = (Row) locations.get("L1");
+ Assert.assertNotNull("L1 should not be null", projectedL1);
+ Assert.assertEquals("L1 should not contain lat", 1,
projectedL1.getArity());
+ Assert.assertEquals("L1 should contain long",
+ -1.542616f, (float) projectedL1.getField(0), 0.000001);
+ projectedL2 = (Row) locations.get("L2");
+ Assert.assertNotNull("L2 should not be null", projectedL2);
+ Assert.assertEquals("L2 should not contain lat", 1,
projectedL2.getArity());
+ Assert.assertEquals("L2 should contain long",
+ -1.539054f, (float) projectedL2.getField(0), 0.000001);
+
+ Schema latitiudeRenamed = new Schema(
+ Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6,
7,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Types.NestedField.required(1, "latitude",
Types.FloatType.get())
+ )
+ ))
+ );
+
+ projected = writeAndRead("latitude_renamed", writeSchema,
latitiudeRenamed, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ locations = toStringMap((Map) projected.getField(0));
+ Assert.assertNotNull("Should project locations map", locations);
+ Assert.assertEquals("Should contain L1 and L2",
+ Sets.newHashSet("L1", "L2"), locations.keySet());
+ projectedL1 = (Row) locations.get("L1");
+ Assert.assertNotNull("L1 should not be null", projectedL1);
+ Assert.assertEquals("L1 should contain latitude",
+ 53.992811f, (float) projectedL1.getField(0), 0.000001);
+ projectedL2 = (Row) locations.get("L2");
+ Assert.assertNotNull("L2 should not be null", projectedL2);
+ Assert.assertEquals("L2 should contain latitude",
+ 52.995143f, (float) projectedL2.getField(0), 0.000001);
+ }
+
+ @Test
+ public void testListProjection() throws IOException {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(10, "values",
+ Types.ListType.ofOptional(11, Types.LongType.get()))
+ );
+
+ List<Long> values = ImmutableList.of(56L, 57L, 58L);
+
+ Row row = Row.of(34L, values);
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+ Assert.assertEquals("Should not project values list", 1,
projected.getArity());
+
+ Schema elementOnly = writeSchema.select("values.element");
+ projected = writeAndRead("element_only", writeSchema, elementOnly, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project entire list", values,
projected.getField(0));
+
+ Schema listOnly = writeSchema.select("values");
+ projected = writeAndRead("list_only", writeSchema, listOnly, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project entire list", values,
projected.getField(0));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testListOfStructsProjection() throws IOException {
+ Schema writeSchema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get()),
+ Types.NestedField.optional(22, "points",
+ Types.ListType.ofOptional(21, Types.StructType.of(
+ Types.NestedField.required(19, "x", Types.IntegerType.get()),
+ Types.NestedField.optional(18, "y", Types.IntegerType.get())
+ ))
+ )
+ );
+
+ Row p1 = Row.of(1, 2);
+ Row p2 = Row.of(3, null);
+ Row row = Row.of(34L, ImmutableList.of(p1, p2));
+
+ Schema idOnly = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ Row projected = writeAndRead("id_only", writeSchema, idOnly, row);
+ Assert.assertEquals("Should contain the correct id value", 34L, (long)
projected.getField(0));
+ Assert.assertEquals("Should not project points list", 1,
projected.getArity());
+
+ projected = writeAndRead("all_points", writeSchema,
writeSchema.select("points"), row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertEquals("Should project points list", row.getField(1),
projected.getField(0));
+
+ projected = writeAndRead("x_only", writeSchema,
writeSchema.select("points.x"), row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project points list", projected.getField(0));
+ List<Row> points = (List<Row>) projected.getField(0);
+ Assert.assertEquals("Should read 2 points", 2, points.size());
+ Row projectedP1 = points.get(0);
+ Assert.assertEquals("Should project x", 1, (int) projectedP1.getField(0));
+ Assert.assertEquals("Should not project y", 1, projectedP1.getArity());
+ Row projectedP2 = points.get(1);
+ Assert.assertEquals("Should not project y", 1, projectedP2.getArity());
+ Assert.assertEquals("Should project x", 3, (int) projectedP2.getField(0));
+
+ projected = writeAndRead("y_only", writeSchema,
writeSchema.select("points.y"), row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project points list", projected.getField(0));
+ points = (List<Row>) projected.getField(0);
+ Assert.assertEquals("Should read 2 points", 2, points.size());
+ projectedP1 = points.get(0);
+ Assert.assertEquals("Should not project x", 1, projectedP1.getArity());
+ Assert.assertEquals("Should project y", 2, (int) projectedP1.getField(0));
+ projectedP2 = points.get(1);
+ Assert.assertEquals("Should not project x", 1, projectedP2.getArity());
+ Assert.assertNull("Should project null y", projectedP2.getField(0));
+
+ Schema yRenamed = new Schema(
+ Types.NestedField.optional(22, "points",
+ Types.ListType.ofOptional(21, Types.StructType.of(
+ Types.NestedField.optional(18, "z", Types.IntegerType.get())
+ ))
+ )
+ );
+
+ projected = writeAndRead("y_renamed", writeSchema, yRenamed, row);
+ Assert.assertEquals("Should not project id", 1, projected.getArity());
+ Assert.assertNotNull("Should project points list", projected.getField(0));
+ points = (List<Row>) projected.getField(0);
+ Assert.assertEquals("Should read 2 points", 2, points.size());
+ projectedP1 = points.get(0);
+ Assert.assertEquals("Should not project x and y", 1,
projectedP1.getArity());
+ Assert.assertEquals("Should project z", 2, (int) projectedP1.getField(0));
+ projectedP2 = points.get(1);
+ Assert.assertEquals("Should not project x and y", 1,
projectedP2.getArity());
+ Assert.assertNull("Should project null z", projectedP2.getField(0));
+ }
+
+ @Test
+ public void testAddedFieldsWithRequiredChildren() throws Exception {
+ Schema schema = new Schema(
+ Types.NestedField.required(1, "a", Types.LongType.get())
+ );
+
+ Row row = Row.of(100L);
+
+ Schema addedFields = new Schema(
+ Types.NestedField.optional(1, "a", Types.LongType.get()),
+ Types.NestedField.optional(2, "b", Types.StructType.of(
+ Types.NestedField.required(3, "c", Types.LongType.get())
+ )),
+ Types.NestedField.optional(4, "d", Types.ListType.ofRequired(5,
Types.LongType.get())),
+ Types.NestedField.optional(6, "e", Types.MapType.ofRequired(7, 8,
Types.LongType.get(), Types.LongType.get()))
+ );
+
+ Row projected =
writeAndRead("add_fields_with_required_children_projection", schema,
addedFields, row);
+ Assert.assertEquals("Should contain the correct value in column 1",
projected.getField(0), 100L);
+ Assert.assertNull("Should contain empty value in new column 2",
projected.getField(1));
+ Assert.assertNull("Should contain empty value in new column 4",
projected.getField(2));
+ Assert.assertNull("Should contain empty value in new column 6",
projected.getField(3));
+ }
+}