http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java deleted file mode 100644 index 0c9c549..0000000 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; - -import org.apache.orc.OrcConf; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import static org.apache.flink.orc.OrcUtils.fillRows; - -/** - * InputFormat to read ORC data. - * For Optimization, reading is done in batch instead of a single row. - */ -public class RowOrcInputFormat - extends FileInputFormat<Row> - implements ResultTypeQueryable<Row> { - - private static final Logger LOG = LoggerFactory.getLogger(RowOrcInputFormat.class); - private static final int BATCH_SIZE = 1024; - - private org.apache.hadoop.conf.Configuration config; - private TypeDescription schema; - private int[] fieldMapping; - - private transient RowTypeInfo rowType; - private transient RecordReader orcRowsReader; - private transient VectorizedRowBatch rowBatch; - private transient Row[] rows; - - private transient int rowInBatch; - - public RowOrcInputFormat(String path, String schemaString, Configuration orcConfig) { - this(path, TypeDescription.fromString(schemaString), orcConfig); - } - - public RowOrcInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig) { - super(new Path(path)); - this.unsplittable = false; - this.schema = orcSchema; - this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema); - this.config = orcConfig; - - this.fieldMapping = new int[this.schema.getChildren().size()]; - for (int i = 0; i < fieldMapping.length; i++) { - this.fieldMapping[i] = i; - } - - } - - public void setFieldMapping(int[] fieldMapping) { - this.fieldMapping = fieldMapping; - // adapt result type - - TypeInformation[] fieldTypes = new TypeInformation[fieldMapping.length]; - String[] fieldNames = new String[fieldMapping.length]; - for (int i = 0; i < fieldMapping.length; i++) { - fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]); - fieldNames[i] = this.rowType.getFieldNames()[fieldMapping[i]]; - } - this.rowType = new RowTypeInfo(fieldTypes, fieldNames); - } - - private boolean[] computeProjectionMask() { - boolean[] projectionMask = new boolean[schema.getMaximumId() + 1]; - for (int inIdx : fieldMapping) { - TypeDescription fieldSchema = schema.getChildren().get(inIdx); - for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) { - projectionMask[i] = true; - } - } - return projectionMask; - } - - @Override - public void openInputFormat() throws IOException { - super.openInputFormat(); - this.rows = new Row[BATCH_SIZE]; - for (int i = 0; i < BATCH_SIZE; i++) { - rows[i] = new Row(fieldMapping.length); - } - } - - @Override - public void open(FileInputSplit fileSplit) throws IOException { - - this.currentSplit = fileSplit; - Preconditions.checkArgument(this.splitStart == 0, "ORC files must be read from the start."); - - if (LOG.isDebugEnabled()) { - LOG.debug("Opening ORC file " + fileSplit.getPath()); - } - - org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath()); - - Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(config)); - - Reader.Options options = orcReader.options() - .range(fileSplit.getStart(), fileSplit.getLength()) - .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(config)) - .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(config)) - .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(config)); - - options.include(computeProjectionMask()); - - // check that schema of file is as expected - if (!this.schema.equals(orcReader.getSchema())) { - - throw new RuntimeException("Invalid schema for file at " + this.filePath + - " Expected:" + this.schema + " Actual: " + orcReader.getSchema()); - } - - this.orcRowsReader = orcReader.rows(options); - - // assign ids - this.schema.getId(); - - this.rowBatch = schema.createRowBatch(BATCH_SIZE); - rowInBatch = 0; - } - - @Override - public void close() throws IOException { - - if (orcRowsReader != null) { - this.orcRowsReader.close(); - } - this.orcRowsReader = null; - - } - - @Override - public void closeInputFormat() throws IOException { - this.rows = null; - this.rows = null; - this.schema = null; - this.rowBatch = null; - } - - @Override - public boolean reachedEnd() throws IOException { - return !ensureBatch(); - } - - private boolean ensureBatch() throws IOException { - - if (rowInBatch >= rowBatch.size) { - rowInBatch = 0; - boolean moreRows = orcRowsReader.nextBatch(rowBatch); - - if (moreRows) { - // read rows - fillRows(rows, schema, rowBatch, fieldMapping); - } - return moreRows; - } - - return true; - } - - @Override - public Row nextRecord(Row reuse) throws IOException { - return rows[this.rowInBatch++]; - } - - @Override - public TypeInformation<Row> getProducedType() { - return rowType; - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - this.config.write(out); - out.writeUTF(schema.toString()); - - out.writeInt(fieldMapping.length); - for (int f : fieldMapping) { - out.writeInt(f); - } - - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.readFields(in); - - if (this.config == null) { - this.config = configuration; - } - this.schema = TypeDescription.fromString(in.readUTF()); - - this.fieldMapping = new int[in.readInt()]; - for (int i = 0; i < fieldMapping.length; i++) { - this.fieldMapping[i] = in.readInt(); - } - - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java new file mode 100644 index 0000000..0efe41f --- /dev/null +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java @@ -0,0 +1,795 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.orc.Reader; +import org.apache.orc.StripeInformation; +import org.junit.After; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link OrcRowInputFormat}. + * + */ +public class OrcRowInputFormatTest { + + private OrcRowInputFormat rowOrcInputFormat; + + @After + public void tearDown() throws IOException { + if (rowOrcInputFormat != null) { + rowOrcInputFormat.close(); + rowOrcInputFormat.closeInputFormat(); + } + rowOrcInputFormat = null; + } + + private static final String TEST_FILE_FLAT = "test-data-flat.orc"; + private static final String TEST_SCHEMA_FLAT = + "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>"; + + private static final String TEST_FILE_NESTED = "test-data-nested.orc"; + private static final String TEST_SCHEMA_NESTED = + "struct<" + + "boolean1:boolean," + + "byte1:tinyint," + + "short1:smallint," + + "int1:int," + + "long1:bigint," + + "float1:float," + + "double1:double," + + "bytes1:binary," + + "string1:string," + + "middle:struct<" + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">," + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">," + + "map:map<" + + "string," + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">"; + + private static final String TEST_FILE_TIMETYPES = "test-data-timetypes.orc"; + private static final String TEST_SCHEMA_TIMETYPES = "struct<time:timestamp,date:date>"; + + private static final String TEST_FILE_DECIMAL = "test-data-decimal.orc"; + private static final String TEST_SCHEMA_DECIMAL = "struct<_col0:decimal(10,5)>"; + + private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc"; + private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>"; + + @Test(expected = FileNotFoundException.class) + public void testInvalidPath() throws IOException{ + rowOrcInputFormat = + new OrcRowInputFormat("/does/not/exist", TEST_SCHEMA_FLAT, new Configuration()); + rowOrcInputFormat.openInputFormat(); + FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); + rowOrcInputFormat.open(inputSplits[0]); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidProjection1() throws IOException{ + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + int[] projectionMask = {1, 2, 3, -1}; + rowOrcInputFormat.selectFields(projectionMask); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidProjection2() throws IOException{ + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + int[] projectionMask = {1, 2, 3, 9}; + rowOrcInputFormat.selectFields(projectionMask); + } + + @Test + public void testProjectionMaskNested() throws IOException{ + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + OrcRowInputFormat spy = spy(rowOrcInputFormat); + + // mock options to check configuration of ORC reader + Reader.Options options = new Reader.Options(); + doReturn(options).when(spy).getOptions(any()); + + spy.selectFields(9, 11, 2); + spy.openInputFormat(); + FileInputSplit[] splits = spy.createInputSplits(1); + spy.open(splits[0]); + + // top-level struct is false + boolean[] expected = new boolean[]{ + false, // top level + false, false, // flat fields 0, 1 are out + true, // flat field 2 is in + false, false, false, false, false, false, // flat fields 3, 4, 5, 6, 7, 8 are out + true, true, true, true, true, // nested field 9 is in + false, false, false, false, // nested field 10 is out + true, true, true, true, true}; // nested field 11 is in + assertArrayEquals(expected, options.getInclude()); + } + + @Test + public void testSplitStripesGivenSplits() throws IOException { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + + OrcRowInputFormat spy = spy(rowOrcInputFormat); + + // mock options to check configuration of ORC reader + Reader.Options options = spy(new Reader.Options()); + doReturn(options).when(spy).getOptions(any()); + + FileInputSplit[] splits = spy.createInputSplits(3); + + spy.openInputFormat(); + spy.open(splits[0]); + verify(options).range(eq(3L), eq(137005L)); + spy.open(splits[1]); + verify(options).range(eq(137008L), eq(136182L)); + spy.open(splits[2]); + verify(options).range(eq(273190L), eq(123633L)); + } + + @Test + public void testSplitStripesCustomSplits() throws IOException { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + + OrcRowInputFormat spy = spy(rowOrcInputFormat); + + // mock list of stripes + List<StripeInformation> stripes = new ArrayList<>(); + StripeInformation stripe1 = mock(StripeInformation.class); + when(stripe1.getOffset()).thenReturn(10L); + when(stripe1.getLength()).thenReturn(90L); + StripeInformation stripe2 = mock(StripeInformation.class); + when(stripe2.getOffset()).thenReturn(100L); + when(stripe2.getLength()).thenReturn(100L); + StripeInformation stripe3 = mock(StripeInformation.class); + when(stripe3.getOffset()).thenReturn(200L); + when(stripe3.getLength()).thenReturn(100L); + StripeInformation stripe4 = mock(StripeInformation.class); + when(stripe4.getOffset()).thenReturn(300L); + when(stripe4.getLength()).thenReturn(100L); + StripeInformation stripe5 = mock(StripeInformation.class); + when(stripe5.getOffset()).thenReturn(400L); + when(stripe5.getLength()).thenReturn(100L); + stripes.add(stripe1); + stripes.add(stripe2); + stripes.add(stripe3); + stripes.add(stripe4); + stripes.add(stripe5); + doReturn(stripes).when(spy).getStripes(any()); + + // mock options to check configuration of ORC reader + Reader.Options options = spy(new Reader.Options()); + doReturn(options).when(spy).getOptions(any()); + + spy.openInputFormat(); + // split ranging 2 stripes + spy.open(new FileInputSplit(0, new Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{})); + verify(options).range(eq(10L), eq(190L)); + // split ranging 0 stripes + spy.open(new FileInputSplit(1, new Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{})); + verify(options).range(eq(0L), eq(0L)); + // split ranging 1 stripe + spy.open(new FileInputSplit(2, new Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{})); + verify(options).range(eq(200L), eq(100L)); + // split ranging 2 stripe + spy.open(new FileInputSplit(3, new Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{})); + verify(options).range(eq(300L), eq(200L)); + } + + @Test + public void testProducedType() throws IOException { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo); + RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType(); + + assertArrayEquals( + new TypeInformation[]{ + // primitives + Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, + // binary + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + // string + Types.STRING, + // struct + Types.ROW_NAMED( + new String[]{"list"}, + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))), + // list + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING)), + // map + new MapTypeInfo<>(Types.STRING, Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING)) + }, + producedType.getFieldTypes()); + assertArrayEquals( + new String[]{"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"}, + producedType.getFieldNames()); + } + + @Test + public void testProducedTypeWithProjection() throws IOException { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + rowOrcInputFormat.selectFields(9, 3, 7, 10); + + assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo); + RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType(); + + assertArrayEquals( + new TypeInformation[]{ + // struct + Types.ROW_NAMED( + new String[]{"list"}, + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))), + // int + Types.INT, + // binary + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + // list + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING)) + }, + producedType.getFieldTypes()); + assertArrayEquals( + new String[]{"middle", "int1", "bytes1", "list"}, + producedType.getFieldNames()); + } + + @Test + public void testSerialization() throws Exception { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + + rowOrcInputFormat.selectFields(0, 4, 1); + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M")); + + byte[] bytes = InstantiationUtil.serializeObject(rowOrcInputFormat); + OrcRowInputFormat copy = InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader()); + + FileInputSplit[] splits = copy.createInputSplits(1); + copy.openInputFormat(); + copy.open(splits[0]); + assertFalse(copy.reachedEnd()); + Row row = copy.nextRecord(null); + + assertNotNull(row); + assertEquals(3, row.getArity()); + // check first row + assertEquals(1, row.getField(0)); + assertEquals(500, row.getField(1)); + assertEquals("M", row.getField(2)); + } + + @Test + public void testNumericBooleanStringPredicates() throws Exception { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + rowOrcInputFormat.selectFields(0, 1, 2, 3, 4, 5, 6, 8); + + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("boolean1", PredicateLeaf.Type.BOOLEAN, false)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.LessThan("byte1", PredicateLeaf.Type.LONG, 1)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.LessThanEquals("short1", PredicateLeaf.Type.LONG, 1024)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Between("int1", PredicateLeaf.Type.LONG, -1, 65536)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("long1", PredicateLeaf.Type.LONG, 9223372036854775807L)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("float1", PredicateLeaf.Type.FLOAT, 1.0)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("double1", PredicateLeaf.Type.FLOAT, -15.0)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.IsNull("string1", PredicateLeaf.Type.STRING)); + // boolean pred + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello")); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + rowOrcInputFormat.openInputFormat(); + + // mock options to check configuration of ORC reader + OrcRowInputFormat spy = spy(rowOrcInputFormat); + Reader.Options options = new Reader.Options(); + doReturn(options).when(spy).getOptions(any()); + + spy.openInputFormat(); + spy.open(splits[0]); + + // verify predicate configuration + SearchArgument sarg = options.getSearchArgument(); + assertNotNull(sarg); + assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString()); + assertEquals(9, sarg.getLeaves().size()); + List<PredicateLeaf> leaves = sarg.getLeaves(); + assertEquals("(EQUALS boolean1 false)", leaves.get(0).toString()); + assertEquals("(LESS_THAN byte1 1)", leaves.get(1).toString()); + assertEquals("(LESS_THAN_EQUALS short1 1024)", leaves.get(2).toString()); + assertEquals("(BETWEEN int1 -1 65536)", leaves.get(3).toString()); + assertEquals("(EQUALS long1 9223372036854775807)", leaves.get(4).toString()); + assertEquals("(EQUALS float1 1.0)", leaves.get(5).toString()); + assertEquals("(EQUALS double1 -15.0)", leaves.get(6).toString()); + assertEquals("(IS_NULL string1)", leaves.get(7).toString()); + assertEquals("(EQUALS string1 hello)", leaves.get(8).toString()); + } + + @Test + public void testTimePredicates() throws Exception { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration()); + + rowOrcInputFormat.addPredicate( + // OR + new OrcRowInputFormat.Or( + // timestamp pred + new OrcRowInputFormat.Equals("time", PredicateLeaf.Type.TIMESTAMP, Timestamp.valueOf("1900-05-05 12:34:56.100")), + // date pred + new OrcRowInputFormat.Equals("date", PredicateLeaf.Type.DATE, Date.valueOf("1900-12-25"))) + ); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + rowOrcInputFormat.openInputFormat(); + + // mock options to check configuration of ORC reader + OrcRowInputFormat spy = spy(rowOrcInputFormat); + Reader.Options options = new Reader.Options(); + doReturn(options).when(spy).getOptions(any()); + + spy.openInputFormat(); + spy.open(splits[0]); + + // verify predicate configuration + SearchArgument sarg = options.getSearchArgument(); + assertNotNull(sarg); + assertEquals("(or leaf-0 leaf-1)", sarg.getExpression().toString()); + assertEquals(2, sarg.getLeaves().size()); + List<PredicateLeaf> leaves = sarg.getLeaves(); + assertEquals("(EQUALS time 1900-05-05 12:34:56.1)", leaves.get(0).toString()); + assertEquals("(EQUALS date 1900-12-25)", leaves.get(1).toString()); + } + + @Test + public void testDecimalPredicate() throws Exception { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration()); + + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Not( + // decimal pred + new OrcRowInputFormat.Equals("_col0", PredicateLeaf.Type.DECIMAL, BigDecimal.valueOf(-1000.5)))); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + rowOrcInputFormat.openInputFormat(); + + // mock options to check configuration of ORC reader + OrcRowInputFormat spy = spy(rowOrcInputFormat); + Reader.Options options = new Reader.Options(); + doReturn(options).when(spy).getOptions(any()); + + spy.openInputFormat(); + spy.open(splits[0]); + + // verify predicate configuration + SearchArgument sarg = options.getSearchArgument(); + assertNotNull(sarg); + assertEquals("(not leaf-0)", sarg.getExpression().toString()); + assertEquals(1, sarg.getLeaves().size()); + List<PredicateLeaf> leaves = sarg.getLeaves(); + assertEquals("(EQUALS _col0 -1000.5)", leaves.get(0).toString()); + } + + @Test(expected = IllegalArgumentException.class) + public void testPredicateWithInvalidColumn() throws Exception { + rowOrcInputFormat = + new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("unknown", PredicateLeaf.Type.LONG, 42)); + } + + @Test + public void testReadNestedFile() throws IOException{ + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + Row row = rowOrcInputFormat.nextRecord(null); + + // validate first row + assertNotNull(row); + assertEquals(12, row.getArity()); + assertEquals(false, row.getField(0)); + assertEquals((byte) 1, row.getField(1)); + assertEquals((short) 1024, row.getField(2)); + assertEquals(65536, row.getField(3)); + assertEquals(9223372036854775807L, row.getField(4)); + assertEquals(1.0f, row.getField(5)); + assertEquals(-15.0d, row.getField(6)); + assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(7)); + assertEquals("hi", row.getField(8)); + // check nested field + assertTrue(row.getField(9) instanceof Row); + Row nested1 = (Row) row.getField(9); + assertEquals(1, nested1.getArity()); + assertTrue(nested1.getField(0) instanceof Object[]); + Object[] nestedList1 = (Object[]) nested1.getField(0); + assertEquals(2, nestedList1.length); + assertEquals(Row.of(1, "bye"), nestedList1[0]); + assertEquals(Row.of(2, "sigh"), nestedList1[1]); + // check list + assertTrue(row.getField(10) instanceof Object[]); + Object[] list1 = (Object[]) row.getField(10); + assertEquals(2, list1.length); + assertEquals(Row.of(3, "good"), list1[0]); + assertEquals(Row.of(4, "bad"), list1[1]); + // check map + assertTrue(row.getField(11) instanceof HashMap); + HashMap map1 = (HashMap) row.getField(11); + assertEquals(0, map1.size()); + + // read second row + assertFalse(rowOrcInputFormat.reachedEnd()); + row = rowOrcInputFormat.nextRecord(null); + + // validate second row + assertNotNull(row); + assertEquals(12, row.getArity()); + assertEquals(true, row.getField(0)); + assertEquals((byte) 100, row.getField(1)); + assertEquals((short) 2048, row.getField(2)); + assertEquals(65536, row.getField(3)); + assertEquals(9223372036854775807L, row.getField(4)); + assertEquals(2.0f, row.getField(5)); + assertEquals(-5.0d, row.getField(6)); + assertArrayEquals(new byte[]{}, (byte[]) row.getField(7)); + assertEquals("bye", row.getField(8)); + // check nested field + assertTrue(row.getField(9) instanceof Row); + Row nested2 = (Row) row.getField(9); + assertEquals(1, nested2.getArity()); + assertTrue(nested2.getField(0) instanceof Object[]); + Object[] nestedList2 = (Object[]) nested2.getField(0); + assertEquals(2, nestedList2.length); + assertEquals(Row.of(1, "bye"), nestedList2[0]); + assertEquals(Row.of(2, "sigh"), nestedList2[1]); + // check list + assertTrue(row.getField(10) instanceof Object[]); + Object[] list2 = (Object[]) row.getField(10); + assertEquals(3, list2.length); + assertEquals(Row.of(100000000, "cat"), list2[0]); + assertEquals(Row.of(-100000, "in"), list2[1]); + assertEquals(Row.of(1234, "hat"), list2[2]); + // check map + assertTrue(row.getField(11) instanceof HashMap); + HashMap map = (HashMap) row.getField(11); + assertEquals(2, map.size()); + assertEquals(Row.of(5, "chani"), map.get("chani")); + assertEquals(Row.of(1, "mauddib"), map.get("mauddib")); + + assertTrue(rowOrcInputFormat.reachedEnd()); + } + + @Test + public void testReadTimeTypeFile() throws IOException{ + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + Row row = rowOrcInputFormat.nextRecord(null); + + // validate first row + assertNotNull(row); + assertEquals(2, row.getArity()); + assertEquals(Timestamp.valueOf("1900-05-05 12:34:56.1"), row.getField(0)); + assertEquals(Date.valueOf("1900-12-25"), row.getField(1)); + + // check correct number of rows + long cnt = 1; + while (!rowOrcInputFormat.reachedEnd()) { + assertNotNull(rowOrcInputFormat.nextRecord(null)); + cnt++; + } + assertEquals(70000, cnt); + } + + @Test + public void testReadDecimalTypeFile() throws IOException{ + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + Row row = rowOrcInputFormat.nextRecord(null); + + // validate first row + assertNotNull(row); + assertEquals(1, row.getArity()); + assertEquals(BigDecimal.valueOf(-1000.5d), row.getField(0)); + + // check correct number of rows + long cnt = 1; + while (!rowOrcInputFormat.reachedEnd()) { + assertNotNull(rowOrcInputFormat.nextRecord(null)); + cnt++; + } + assertEquals(6000, cnt); + } + + @Test + public void testReadNestedListFile() throws Exception { + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTEDLIST), TEST_SCHEMA_NESTEDLIST, new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + + Row row = null; + long cnt = 0; + + // read all rows + while (!rowOrcInputFormat.reachedEnd()) { + + row = rowOrcInputFormat.nextRecord(row); + assertEquals(1, row.getArity()); + + // outer list + Object[] list = (Object[]) row.getField(0); + assertEquals(1, list.length); + + // nested list of rows + Row[] nestedRows = (Row[]) list[0]; + assertEquals(1, nestedRows.length); + assertEquals(1, nestedRows[0].getArity()); + + // verify list value + assertEquals(cnt, nestedRows[0].getField(0)); + cnt++; + } + // number of rows in file + assertEquals(100, cnt); + } + + @Test + public void testReadWithProjection() throws IOException{ + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); + + rowOrcInputFormat.selectFields(7, 0, 10, 8); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + Row row = rowOrcInputFormat.nextRecord(null); + + // validate first row + assertNotNull(row); + assertEquals(4, row.getArity()); + // check binary + assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(0)); + // check boolean + assertEquals(false, row.getField(1)); + // check list + assertTrue(row.getField(2) instanceof Object[]); + Object[] list1 = (Object[]) row.getField(2); + assertEquals(2, list1.length); + assertEquals(Row.of(3, "good"), list1[0]); + assertEquals(Row.of(4, "bad"), list1[1]); + // check string + assertEquals("hi", row.getField(3)); + + // check that there is a second row with four fields + assertFalse(rowOrcInputFormat.reachedEnd()); + row = rowOrcInputFormat.nextRecord(null); + assertNotNull(row); + assertEquals(4, row.getArity()); + assertTrue(rowOrcInputFormat.reachedEnd()); + } + + @Test + public void testReadFileInSplits() throws IOException{ + + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + rowOrcInputFormat.selectFields(0, 1); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(4); + assertEquals(4, splits.length); + rowOrcInputFormat.openInputFormat(); + + long cnt = 0; + // read all splits + for (FileInputSplit split : splits) { + + // open split + rowOrcInputFormat.open(split); + // read and count all rows + while (!rowOrcInputFormat.reachedEnd()) { + assertNotNull(rowOrcInputFormat.nextRecord(null)); + cnt++; + } + } + // check that all rows have been read + assertEquals(1920800, cnt); + } + + @Test + public void testReadFileWithFilter() throws IOException{ + + rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); + rowOrcInputFormat.selectFields(0, 1); + + // read head and tail of file + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Or( + new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L), + new OrcRowInputFormat.Not( + new OrcRowInputFormat.LessThanEquals("_col0", PredicateLeaf.Type.LONG, 1920000L)) + )); + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M")); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + + // open split + rowOrcInputFormat.open(splits[0]); + + // read and count all rows + long cnt = 0; + while (!rowOrcInputFormat.reachedEnd()) { + assertNotNull(rowOrcInputFormat.nextRecord(null)); + cnt++; + } + // check that only the first and last stripes of the file have been read. + // Each stripe has 5000 rows, except the last which has 800 rows. + assertEquals(5800, cnt); + } + + @Test + public void testReadFileWithEvolvedSchema() throws IOException{ + + rowOrcInputFormat = new OrcRowInputFormat( + getPath(TEST_FILE_FLAT), + "struct<_col0:int,_col1:string,_col4:string,_col3:string>", // previous version of schema + new Configuration()); + rowOrcInputFormat.selectFields(3, 0, 2); + + rowOrcInputFormat.addPredicate( + new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L)); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + + // open split + rowOrcInputFormat.open(splits[0]); + + // read and validate first row + assertFalse(rowOrcInputFormat.reachedEnd()); + Row row = rowOrcInputFormat.nextRecord(null); + assertNotNull(row); + assertEquals(3, row.getArity()); + assertEquals("Primary", row.getField(0)); + assertEquals(1, row.getField(1)); + assertEquals("M", row.getField(2)); + + // read and count remaining rows + long cnt = 1; + while (!rowOrcInputFormat.reachedEnd()) { + assertNotNull(rowOrcInputFormat.nextRecord(null)); + cnt++; + } + // check that only the first and last stripes of the file have been read. + // Each stripe has 5000 rows, except the last which has 800 rows. + assertEquals(5000, cnt); + } + + private String getPath(String fileName) { + return getClass().getClassLoader().getResource(fileName).getPath(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java index 3de6ab3..e6ef1e1 100644 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java @@ -18,125 +18,101 @@ package org.apache.flink.orc; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.Row; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; import org.junit.Test; -import java.net.URL; -import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; + /** * Tests for {@link OrcTableSource}. */ public class OrcTableSourceITCase extends MultipleProgramsTestBase { - private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," + - "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," + - "middle:struct<list:array<struct<int1:int,string1:string>>>," + - "list:array<struct<int1:int,string1:string>>," + - "map:map<string,struct<int1:int,string1:string>>>"; - - private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc"); - - - private static final String[] TEST1_DATA = new String[] { - "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}", - "true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," + - "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" }; + private static final String TEST_FILE_FLAT = "test-data-flat.orc"; + private static final String TEST_SCHEMA_FLAT = + "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>"; public OrcTableSourceITCase() { super(TestExecutionMode.COLLECTION); } @Test - public void testOrcTableSource() throws Exception { + public void testFullScan() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); - assert (test1URL != null); - OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA); - - tEnv.registerTableSource("orcTable", orc); - - String query = "Select * from orcTable"; - Table t = tEnv.sql(query); + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_FLAT)) + .forOrcSchema(TEST_SCHEMA_FLAT) + .build(); + tEnv.registerTableSource("OrcTable", orc); + + String query = + "SELECT COUNT(*), " + + "MIN(_col0), MAX(_col0), " + + "MIN(_col1), MAX(_col1), " + + "MIN(_col2), MAX(_col2), " + + "MIN(_col3), MAX(_col3), " + + "MIN(_col4), MAX(_col4), " + + "MIN(_col5), MAX(_col5), " + + "MIN(_col6), MAX(_col6), " + + "MIN(_col7), MAX(_col7), " + + "MIN(_col8), MAX(_col8) " + + "FROM OrcTable"; + Table t = tEnv.sqlQuery(query); DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class); - List<Row> records = dataSet.collect(); - - Assert.assertEquals(records.size(), 2); + List<Row> result = dataSet.collect(); - List<String> actualRecords = new ArrayList<>(); - for (Row record : records) { - Assert.assertEquals(record.getArity(), 12); - actualRecords.add(record.toString()); - } - - Assert.assertThat(actualRecords, CoreMatchers.hasItems(TEST1_DATA)); + assertEquals(1, result.size()); + assertEquals( + "1920800,1,1920800,F,M,D,W,2 yr Degree,Unknown,500,10000,Good,Unknown,0,6,0,6,0,6", + result.get(0).toString()); } @Test - public void testOrcTableProjection() throws Exception { + public void testScanWithProjectionAndFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); - assert(test1URL != null); - OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA); - - tEnv.registerTableSource("orcTable", orc); - - String query = "Select middle,list,map from orcTable"; - Table t = tEnv.sql(query); - - String[] colNames = new String[] {"middle", "list", "map"}; - - RowTypeInfo rowTypeInfo = new RowTypeInfo( - new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}, - new String[] {"int1", "string1"}); - - RowTypeInfo structTypeInfo = new RowTypeInfo( - new TypeInformation[] {ObjectArrayTypeInfo.getInfoFor(rowTypeInfo)}, - new String[] {"list"}); - - TypeInformation[] colTypes = new TypeInformation[] { - structTypeInfo, - ObjectArrayTypeInfo.getInfoFor(rowTypeInfo), - new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo) - }; - - TableSchema actualTableSchema = new TableSchema(colNames, colTypes); - - Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames); - Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes); - Assert.assertEquals(actualTableSchema.toString(), t.getSchema().toString()); + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_FLAT)) + .forOrcSchema(TEST_SCHEMA_FLAT) + .build(); + tEnv.registerTableSource("OrcTable", orc); + + String query = + "SELECT " + + "MIN(_col4), MAX(_col4), " + + "MIN(_col3), MAX(_col3), " + + "MIN(_col0), MAX(_col0), " + + "MIN(_col2), MAX(_col2), " + + "COUNT(*) " + + "FROM OrcTable " + + "WHERE (_col0 BETWEEN 4975 and 5024 OR _col0 BETWEEN 9975 AND 10024) AND _col1 = 'F'"; + Table t = tEnv.sqlQuery(query); DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class); - List<Row> records = dataSet.collect(); - - Assert.assertEquals(records.size(), 2); - for (Row record: records) { - Assert.assertEquals(record.getArity(), 3); - } + List<Row> result = dataSet.collect(); + assertEquals(1, result.size()); + assertEquals( + "1500,6000,2 yr Degree,Unknown,4976,10024,D,W,50", + result.get(0).toString()); } + private String getPath(String fileName) { + return getClass().getClassLoader().getResource(fileName).getPath(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java index c285054..b654f76 100644 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java @@ -18,96 +18,248 @@ package org.apache.flink.orc; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.expressions.ArrayElementAt; +import org.apache.flink.table.expressions.EqualTo; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.GetCompositeField; +import org.apache.flink.table.expressions.GreaterThan; +import org.apache.flink.table.expressions.Literal; +import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.types.Row; -import org.junit.Assert; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.Test; +import org.mockito.ArgumentCaptor; -import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Unit Tests for {@link OrcTableSource}. */ public class OrcTableSourceTest { - private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," + - "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," + - "middle:struct<list:array<struct<int1:int,string1:string>>>," + - "list:array<struct<int1:int,string1:string>>," + - "map:map<string,struct<int1:int,string1:string>>>"; + private static final String TEST_FILE_NESTED = "test-data-nested.orc"; + private static final String TEST_SCHEMA_NESTED = + "struct<" + + "boolean1:boolean," + + "byte1:tinyint," + + "short1:smallint," + + "int1:int," + + "long1:bigint," + + "float1:float," + + "double1:double," + + "bytes1:binary," + + "string1:string," + + "middle:struct<" + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">," + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">," + + "map:map<" + + "string," + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">"; + + @Test + public void testGetReturnType() throws Exception { + + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); - private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc"); + TypeInformation<Row> returnType = orc.getReturnType(); + assertNotNull(returnType); + assertTrue(returnType instanceof RowTypeInfo); + RowTypeInfo rowType = (RowTypeInfo) returnType; + + RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()); + assertEquals(expected, rowType); + } @Test - public void testOrcSchema() throws Exception { + public void testGetTableSchema() throws Exception { + + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); + + TableSchema schema = orc.getTableSchema(); + assertNotNull(schema); + assertArrayEquals(getNestedFieldNames(), schema.getColumnNames()); + assertArrayEquals(getNestedFieldTypes(), schema.getTypes()); + } + + @Test + public void testProjectFields() throws Exception { + + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); + + OrcTableSource projected = (OrcTableSource) orc.projectFields(new int[]{3, 5, 1, 0}); - assert(test1URL != null); - OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA); + // ensure copy is returned + assertTrue(orc != projected); - String expectedSchema = "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer, long1: Long, " + - "float1: Float, double1: Double, bytes1: byte[], string1: String, " + - "middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>), " + - "list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>, " + - "map: Map<String, Row(int1: Integer, string1: String)>)"; + // ensure table schema is identical + assertEquals(orc.getTableSchema(), projected.getTableSchema()); - Assert.assertEquals(expectedSchema, orc.getReturnType().toString()); + // ensure return type was adapted + String[] fieldNames = getNestedFieldNames(); + TypeInformation[] fieldTypes = getNestedFieldTypes(); + assertEquals( + Types.ROW_NAMED( + new String[] {fieldNames[3], fieldNames[5], fieldNames[1], fieldNames[0]}, + new TypeInformation[] {fieldTypes[3], fieldTypes[5], fieldTypes[1], fieldTypes[0]}), + projected.getReturnType()); + // ensure IF is configured with selected fields + OrcTableSource spyTS = spy(projected); + OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class); + doReturn(mockIF).when(spyTS).buildOrcInputFormat(); + spyTS.getDataSet(mock(ExecutionEnvironment.class)); + verify(mockIF).selectFields(eq(3), eq(5), eq(1), eq(0)); } @Test - public void testOrcTableSchema() throws Exception { + public void testApplyPredicate() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + OrcTableSource orc = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); - assert(test1URL != null); - OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA); + // expressions for predicates + Expression pred1 = new GreaterThan( + new ResolvedFieldReference("int1", Types.INT), + new Literal(100, Types.INT)); + Expression pred2 = new EqualTo( + new ResolvedFieldReference("string1", Types.STRING), + new Literal("hello", Types.STRING)); + Expression pred3 = new EqualTo( + new GetCompositeField( + new ArrayElementAt( + new ResolvedFieldReference( + "list", + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))), + new Literal(1, Types.INT)), + "int1"), + new Literal(1, Types.INT) + ); + ArrayList<Expression> preds = new ArrayList<>(); + preds.add(pred1); + preds.add(pred2); + preds.add(pred3); - tEnv.registerTableSource("orcTable", orc); - String query = "Select * from orcTable"; - Table t = tEnv.sql(query); + // apply predicates on TableSource + OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds); - String[] colNames = new String[] { - "boolean1", "byte1", "short1", "int1", "long1", "float1", - "double1", "bytes1", "string1", "list", "list0", "map" - }; + // ensure copy is returned + assertTrue(orc != projected); - RowTypeInfo rowTypeInfo = new RowTypeInfo( - new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}, - new String[] {"int1", "string1"}); - - TypeInformation[] colTypes = new TypeInformation[] { - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.BYTE_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.FLOAT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - ObjectArrayTypeInfo.getInfoFor(rowTypeInfo), - ObjectArrayTypeInfo.getInfoFor(rowTypeInfo), - new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo) - }; - TableSchema expectedTableSchema = new TableSchema(colNames, colTypes); + // ensure table schema is identical + assertEquals(orc.getTableSchema(), projected.getTableSchema()); + + // ensure return type is identical + assertEquals( + Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()), + projected.getReturnType()); + + // ensure IF is configured with supported predicates + OrcTableSource spyTS = spy(projected); + OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class); + doReturn(mockIF).when(spyTS).buildOrcInputFormat(); + spyTS.getDataSet(mock(ExecutionEnvironment.class)); + + ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class); + verify(mockIF, times(2)).addPredicate(arguments.capture()); + List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList()); + assertTrue(values.contains( + new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString())); + assertTrue(values.contains( + new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString())); + + // ensure filter pushdown is correct + assertTrue(spyTS.isFilterPushedDown()); + assertFalse(orc.isFilterPushedDown()); + } + + private String getPath(String fileName) { + return getClass().getClassLoader().getResource(fileName).getPath(); + } - Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames); - Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes); - Assert.assertEquals(expectedTableSchema.toString(), t.getSchema().toString()); + private String[] getNestedFieldNames() { + return new String[] { + "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map" + }; + } + private TypeInformation[] getNestedFieldTypes() { + return new TypeInformation[]{ + Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING, + Types.ROW_NAMED( + new String[]{"list"}, + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + ), + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ), + new MapTypeInfo<>( + Types.STRING, + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + }; } } http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java new file mode 100644 index 0000000..2cb1715 --- /dev/null +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.orc.TypeDescription; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit tests for {@link OrcUtils}. + * + */ +public class OrcUtilsTest { + + @Test + public void testFlatSchemaToTypeInfo1() { + + String schema = + "struct<" + + "boolean1:boolean," + + "byte1:tinyint," + + "short1:smallint," + + "int1:int," + + "long1:bigint," + + "float1:float," + + "double1:double," + + "bytes1:binary," + + "string1:string," + + "date1:date," + + "timestamp1:timestamp," + + "decimal1:decimal(5,2)" + + ">"; + TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema)); + + Assert.assertNotNull(typeInfo); + Assert.assertTrue(typeInfo instanceof RowTypeInfo); + RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; + + // validate field types + Assert.assertArrayEquals( + new TypeInformation[]{ + Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING, + Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO + }, + rowTypeInfo.getFieldTypes()); + + // validate field names + Assert.assertArrayEquals( + new String[] { + "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", + "bytes1", "string1", "date1", "timestamp1", "decimal1" + }, + rowTypeInfo.getFieldNames()); + + } + + @Test + public void testNestedSchemaToTypeInfo1() { + + String schema = + "struct<" + + "middle:struct<" + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">," + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">," + + "map:map<" + + "string," + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">"; + TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema)); + + Assert.assertNotNull(typeInfo); + Assert.assertTrue(typeInfo instanceof RowTypeInfo); + RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; + + // validate field types + Assert.assertArrayEquals( + new TypeInformation[]{ + Types.ROW_NAMED( + new String[]{"list"}, + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + ), + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ), + new MapTypeInfo<>( + Types.STRING, + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + }, + rowTypeInfo.getFieldTypes()); + + // validate field names + Assert.assertArrayEquals( + new String[] {"middle", "list", "map"}, + rowTypeInfo.getFieldNames()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java deleted file mode 100644 index 60008a0..0000000 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.types.Row; - -import org.apache.hadoop.conf.Configuration; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Tests for the {@link RowOrcInputFormat}. - */ - -public class RowOrcInputFormatTest { - - private RowOrcInputFormat rowOrcInputFormat; - - @After - public void tearDown() throws IOException { - if (rowOrcInputFormat != null) { - rowOrcInputFormat.close(); - rowOrcInputFormat.closeInputFormat(); - } - rowOrcInputFormat = null; - } - - private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc"); - - private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," + - "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," + - "middle:struct<list:array<struct<int1:int,string1:string>>>," + - "list:array<struct<int1:int,string1:string>>," + - "map:map<string,struct<int1:int,string1:string>>>"; - - private static final String[] TEST1_DATA = new String[] { - "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}", - "true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," + - "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" }; - - private static final String[] TEST1_PROJECTED_DATA = new String[] { - "{},[3,good, 4,bad],[1,bye, 2,sigh],hi,[0, 1, 2, 3, 4],-15.0,1.0,9223372036854775807,65536,1024,1,false", - "{chani=5,chani, mauddib=1,mauddib},[100000000,cat, -100000,in, 1234,hat],[1,bye, 2,sigh],bye," + - "[],-5.0,2.0,9223372036854775807,65536,2048,100,true" }; - - private static final String TEST1_INVALID_SCHEMA = "struct<boolean1:int,byte1:tinyint,short1:smallint,int1:int," + - "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," + - "middle:struct<list:array<struct<int1:int,string1:string>>>," + - "list:array<struct<int1:int,string1:string>>," + - "map:map<string,struct<int1:int,string1:string>>>"; - - @Test(expected = FileNotFoundException.class) - public void testInvalidPath() throws IOException{ - - rowOrcInputFormat = new RowOrcInputFormat("TestOrcFile.test2.orc", TEST1_SCHEMA, new Configuration()); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.open(inputSplits[0]); - - } - - @Test(expected = RuntimeException.class) - public void testInvalidSchema() throws IOException{ - - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_INVALID_SCHEMA, new Configuration()); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.open(inputSplits[0]); - - } - - @Test(expected = IndexOutOfBoundsException.class) - public void testInvalidProjection() throws IOException{ - - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration()); - int[] projectionMask = {14}; - rowOrcInputFormat.setFieldMapping(projectionMask); - } - - @Test - public void testMajorDataTypes() throws IOException{ - - // test for boolean,byte,short,int,long,float,double,bytes,string,struct,list,map - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration()); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = null; - int count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - Assert.assertEquals(row.toString(), TEST1_DATA[count++]); - } - } - } - - @Test - public void testProjection() throws IOException{ - - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration()); - int[] projectionMask = {11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}; - rowOrcInputFormat.setFieldMapping(projectionMask); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = null; - int count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - Assert.assertEquals(row.toString(), TEST1_PROJECTED_DATA[count++]); - } - } - - } - - @Test - public void testTimeStampAndDate() throws IOException{ - - URL expectedDataURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.dat"); - assert(expectedDataURL != null); - List<String> expectedTimeStampAndDate = Files.readAllLines(Paths.get(expectedDataURL.getPath())); - - URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<time:timestamp,date:date>"; - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - rowOrcInputFormat.openInputFormat(); - - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - List<Object> actualTimeStampAndDate = new ArrayList<>(); - - Row row = null; - int count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - count++; - if (count <= 10000) { - actualTimeStampAndDate.add(row.getField(0) + "," + row.getField(1)); - } - - } - } - Assert.assertEquals(count, 70000); - Assert.assertEquals(expectedTimeStampAndDate.size(), actualTimeStampAndDate.size()); - Assert.assertEquals(expectedTimeStampAndDate.toString(), actualTimeStampAndDate.toString()); - - } - - @Test - public void testDecimal() throws IOException{ - - URL expectedDataURL = getClass().getClassLoader().getResource("decimal.dat"); - List<String> expectedDecimal = Files.readAllLines(Paths.get(expectedDataURL.getPath())); - - URL testInputURL = getClass().getClassLoader().getResource("decimal.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<_col0:decimal(10,5)>"; - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - rowOrcInputFormat.openInputFormat(); - - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - List<Object> actualDecimal = new ArrayList<>(); - - Row row = null; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - actualDecimal.add(row.getField(0)); - } - } - - Assert.assertEquals(expectedDecimal.size(), actualDecimal.size()); - Assert.assertEquals(expectedDecimal.toString(), actualDecimal.toString()); - - } - - @Test - public void testEmptyFile() throws IOException{ - - URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.emptyFile.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - - rowOrcInputFormat = new RowOrcInputFormat(path, TEST1_SCHEMA, new Configuration()); - rowOrcInputFormat.openInputFormat(); - - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = new Row(1); - int count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - count++; - } - } - - Assert.assertEquals(count, 0); - } - - @Test - public void testLargeFile() throws IOException{ - - URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," + - "_col5:string,_col6:int,_col7:int,_col8:int>"; - - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - rowOrcInputFormat.openInputFormat(); - - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = new Row(1); - int count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - count++; - } - } - - Assert.assertEquals(count, 1920800); - } - - @Test - public void testProducedType() throws IOException{ - - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration()); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - rowOrcInputFormat.open(inputSplits[0]); - - TypeInformation<Row> type = rowOrcInputFormat.getProducedType(); - Assert.assertEquals(type.toString(), "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer," + - " long1: Long, float1: Float, double1: Double, bytes1: byte[], string1: String," + - " middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," + - " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," + - " map: Map<String, Row(int1: Integer, string1: String)>)"); - - } - - @Test - public void testProducedTypeWithProjection() throws IOException{ - - assert(test1URL != null); - rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration()); - int[] projectionMask = {9, 10, 11}; - rowOrcInputFormat.setFieldMapping(projectionMask); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - rowOrcInputFormat.open(inputSplits[0]); - - TypeInformation<Row> type = rowOrcInputFormat.getProducedType(); - Assert.assertEquals(type.toString(), "Row(middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," + - " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," + - " map: Map<String, Row(int1: Integer, string1: String)>)"); - - } - - @Test - public void testLongList() throws Exception { - - URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listlong.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<mylist1:array<bigint>>"; - - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = null; - long count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - Assert.assertEquals(row.getArity(), 1); - Object object = row.getField(0); - long[] l = (long[]) object; - - Assert.assertEquals(l.length, 2); - if (count < 50) { - Assert.assertArrayEquals(l, new long[]{count, count + 1}); - } - else { - Assert.assertArrayEquals(l, new long[]{0L, 0L}); - } - count = count + 2; - } - } - Assert.assertEquals(count, 100); - } - - @Test - public void testStringList() throws Exception { - - URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.liststring.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<mylist1:array<string>>"; - - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = null; - long count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - Assert.assertEquals(row.getArity(), 1); - Object object = row.getField(0); - String[] l = (String[]) object; - - Assert.assertEquals(l.length, 2); - Assert.assertArrayEquals(l, new String[]{"hello" + count, "hello" + (count + 1) }); - count = count + 2; - } - } - Assert.assertEquals(count, 200); - } - - @Test - public void testListOfListOfStructOfLong() throws Exception { - URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listliststructlong.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<mylist1:array<array<struct<mylong1:bigint>>>>"; - - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - - Assert.assertEquals(inputSplits.length, 1); - - Row row = null; - long count = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - while (!rowOrcInputFormat.reachedEnd()) { - - row = rowOrcInputFormat.nextRecord(row); - Assert.assertEquals(row.getArity(), 1); - - Object[] objects = (Object[]) row.getField(0); - Assert.assertEquals(objects.length, 1); - - Object[] objects1 = (Object[]) objects[0]; - Assert.assertEquals(objects1.length, 1); - - Row[] nestedRows = Arrays.copyOf(objects1, objects1.length, Row[].class); - Assert.assertEquals(nestedRows.length, 1); - - Assert.assertEquals(nestedRows[0].getArity(), 1); - - Assert.assertEquals(nestedRows[0].getField(0), count); - - count++; - } - } - Assert.assertEquals(count, 100); - } - - @Test - public void testSplit() throws IOException{ - - URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc"); - assert(testInputURL != null); - String path = testInputURL.getPath(); - String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," + - "_col5:string,_col6:int,_col7:int,_col8:int>"; - - rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration()); - rowOrcInputFormat.openInputFormat(); - - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(10); - - Assert.assertEquals(inputSplits.length, 10); - - Row row = null; - int countTotalRecords = 0; - for (FileInputSplit split : inputSplits) { - rowOrcInputFormat.open(split); - int countSplitRecords = 0; - while (!rowOrcInputFormat.reachedEnd()) { - row = rowOrcInputFormat.nextRecord(row); - countSplitRecords++; - } - Assert.assertNotEquals(countSplitRecords, 1920800); - countTotalRecords += countSplitRecords; - } - - Assert.assertEquals(countTotalRecords, 1920800); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc deleted file mode 100644 index ecdadcb..0000000 Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc deleted file mode 100644 index 0f3f9c8..0000000 Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc deleted file mode 100644 index 648ea18..0000000 Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc deleted file mode 100644 index 75a5f2a..0000000 Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc deleted file mode 100644 index 4fb0bef..0000000 Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc and /dev/null differ