[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263443#comment-16263443
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5043


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263442#comment-16263442
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4670


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262976#comment-16262976
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5043
  
Thanks for the review @twalthr!
I'll address your comments and will merge this PR.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262973#comment-16262973
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152634076
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link OrcTableSource}.
+ */
+public class OrcTableSourceITCase extends MultipleProgramsTestBase {
--- End diff --

`TableProgramsTestBase` doesn't really give any advantage over 
`MultipleProgramsTestBase` since I'm testing the table source and not the 
null-behavior of the query execution.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262597#comment-16262597
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152580041
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262593#comment-16262593
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152579106
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262591#comment-16262591
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152578874
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
--- End diff --

Ok, sorry, I didn't check for custom serialization.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262583#comment-16262583
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152577385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan(
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+
+val adjustedCnt: Double = tableSource match {
+  case f: FilterableTableSource[_] if f.isFilterPushedDown =>
+// ensure we prefer FilterableTableSources with pushed-down 
filters.
+rowCnt - 1.0
--- End diff --

Doesn't really make a difference IMO. It's all about relative costs. We 
only need to make sure that a `FilterableTableSource` with pushed down 
predicates appears to be less expensive than the same `TableSource` without 
pushed predicates.

The problem has not occurred before, because the `OrcTableSource` does not 
guarantee that all emitted rows match the pushed predicates. Therefore, all 
predicates are also applied by a following `Calc` operator such that the cost 
of that operator is not decreased.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262546#comment-16262546
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152571674
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
comparatorOrders);
}
}
+
+   /**
+* Creates a {@link RowTypeInfo} with projected fields.
+*
+* @param rowType The original RowTypeInfo whose fields are projected
+* @param fieldMapping The field mapping of the projection
+* @return A RowTypeInfo with projected fields.
+*/
+   public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] 
fieldMapping) {
--- End diff --

I think it is cleaner to have this as a static method than an instance 
method. 
A static method makes it explicit that this creates a new (immutable) 
`RowTypeInfo`.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262338#comment-16262338
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152536536
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262330#comment-16262330
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152535711
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262314#comment-16262314
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152506343
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262307#comment-16262307
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152499605
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -485,6 +486,50 @@ val csvTableSource = CsvTableSource
 
 {% top %}
 
+### OrcTableSource
+
+The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a 
file format for structured data and stores the data in a compressed, columnar 
representation. ORC is very storage efficient and supports projection and 
filter push-down.
+
+An `OrcTableSource` is created as shown below:
+
+
+
+{% highlight java %}
+
+// create Hadoop Configuration
+Configuration config = new Configuration();
+
+OrcTableSource orcTableSource = new OrcTableSource(
--- End diff --

Should we also provide a builder as it is done for all other sources to be 
consistent?


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262310#comment-16262310
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152519868
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
comparatorOrders);
}
}
+
+   /**
+* Creates a {@link RowTypeInfo} with projected fields.
+*
+* @param rowType The original RowTypeInfo whose fields are projected
+* @param fieldMapping The field mapping of the projection
+* @return A RowTypeInfo with projected fields.
+*/
+   public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] 
fieldMapping) {
--- End diff --

Why is this method static?


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262304#comment-16262304
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152518464
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262312#comment-16262312
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152531937
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link OrcTableSource}.
+ */
+public class OrcTableSourceITCase extends MultipleProgramsTestBase {
--- End diff --

`TableProgramsTestBase` ?


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262309#comment-16262309
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152503422
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262317#comment-16262317
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152502704
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
--- End diff --

`Configuration` is not serializable.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262308#comment-16262308
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152518885
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262319#comment-16262319
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152508692
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262318#comment-16262318
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152533796
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsFalse;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.IsTrue;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.orc.TypeDescription;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A TableSource to read ORC files.
+ *
+ * The {@link OrcTableSource} supports projection and filter 
push-down.
+ *
+ * An {@link OrcTableSource} is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = "file:///my/data/file.orc";
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource
+   implements BatchTableSource, ProjectableTableSource, 
FilterableTableSource {
+
+   private static final int DEFAULT_BATCH_SIZE = 1024;
+
+   // path to read ORC files from
+   private final String path;
+   // schema of the ORC file
+   private final TypeDescription orcSchema;
+   // the schema of the Table
+   private final TableSchema tableSchema;
+   // the configuration to read the file
+   private final Configuration orcConfig;
+   // the number of rows to read in a batch
+   private final int batchSize;
+
+   // type information of the data returned by the InputFormat
+   private final RowTypeInfo typeInfo;
+   // list of selected ORC fields to return
+   private final int[] selectedFields;
+   // list of predicates to 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262303#comment-16262303
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152501940
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -0,0 +1,112 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-orc_${scala.binary.version}
+   flink-orc
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   provided
+   
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262306#comment-16262306
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152513215
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262316#comment-16262316
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152524790
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262311#comment-16262311
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152526459
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsFalse;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.IsTrue;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.orc.TypeDescription;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A TableSource to read ORC files.
+ *
+ * The {@link OrcTableSource} supports projection and filter 
push-down.
+ *
+ * An {@link OrcTableSource} is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = "file:///my/data/file.orc";
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource
+   implements BatchTableSource, ProjectableTableSource, 
FilterableTableSource {
+
+   private static final int DEFAULT_BATCH_SIZE = 1024;
+
+   // path to read ORC files from
+   private final String path;
+   // schema of the ORC file
+   private final TypeDescription orcSchema;
+   // the schema of the Table
+   private final TableSchema tableSchema;
+   // the configuration to read the file
+   private final Configuration orcConfig;
+   // the number of rows to read in a batch
+   private final int batchSize;
+
+   // type information of the data returned by the InputFormat
+   private final RowTypeInfo typeInfo;
+   // list of selected ORC fields to return
+   private final int[] selectedFields;
+   // list of predicates to 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262305#comment-16262305
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152510004
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262313#comment-16262313
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152532789
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan(
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+
+val adjustedCnt: Double = tableSource match {
+  case f: FilterableTableSource[_] if f.isFilterPushedDown =>
+// ensure we prefer FilterableTableSources with pushed-down 
filters.
+rowCnt - 1.0
--- End diff --

Shouldn't this be something like `rowCnt * 0.33`?


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262315#comment-16262315
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152531212
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsFalse;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.IsTrue;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.orc.TypeDescription;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A TableSource to read ORC files.
+ *
+ * The {@link OrcTableSource} supports projection and filter 
push-down.
+ *
+ * An {@link OrcTableSource} is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = "file:///my/data/file.orc";
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource
+   implements BatchTableSource, ProjectableTableSource, 
FilterableTableSource {
+
+   private static final int DEFAULT_BATCH_SIZE = 1024;
+
+   // path to read ORC files from
+   private final String path;
+   // schema of the ORC file
+   private final TypeDescription orcSchema;
+   // the schema of the Table
+   private final TableSchema tableSchema;
+   // the configuration to read the file
+   private final Configuration orcConfig;
+   // the number of rows to read in a batch
+   private final int batchSize;
+
+   // type information of the data returned by the InputFormat
+   private final RowTypeInfo typeInfo;
+   // list of selected ORC fields to return
+   private final int[] selectedFields;
+   // list of predicates to 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261489#comment-16261489
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5043

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

## What is the purpose of the change

* Adds `OrcRowInputFormat` to read [ORC files](https://orc.apache.org) as 
`DataSet`. The input format supports projection and filter push-down.
* Adds `OrcTableSource` to read [ORC files](https://orc.apache.org) as a 
`Table` in a batch Table API or SQL query. The table source supports projection 
and filter push-down.

## Brief change log

* Creates a new module `flink-connectors/flink-orc`
* Add `OrcRowInputFormat`
* Add `OrcTableSource`
* Add tests for input format and table source
* Adjust cost model of batch table scans to favor table sources with 
pushed-down filters over those without pushed-down filters. 
* Add static method to `RowTypeInfo` to project on fields.
* Improve translation of literals in `RexProgramExtractor`

## Verifying this change

* `OrcRowInputFormatTest` verifies 
  * Correct configuration of ORC readers.
  * Results when reading ORC files
  * Schema evolution support
  * Computation of split boundaries

* `OrcTableSourceTest` verifies
  * Correct implementation of TableSource interface methods
  * Correct configuration of `OrcRowInputFormat` for test queries 
(predicate and filter push-down)

* `OrcTableSourceITCase` runs end-to-end tests with SQL queries.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **yes**, adds a new 
Maven module `flink-orc` with a dependency on `org.apache.orc/orc-core`
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **yes**, documentation for 
`RowTableSource` was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink table-ORC

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5043


commit 2f524dfa0c4f8468691151925a622ba7fee55f0f
Author: uybhatti 
Date:   2017-03-03T22:55:22Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

commit d80506e3785268f541457a69ade3118c634cf7e7
Author: Fabian Hueske 
Date:   2017-11-13T13:54:54Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.




> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212473#comment-16212473
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/4670
  
Any update on this? reading ORC data would be a very nice feature..


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182635#comment-16182635
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user uybhatti commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r141356723
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,2229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+public class OrcUtils {
+
+   /**
+* Convert ORC schema types to Flink types.
+*
+* @param schema schema of orc file
+*
+*/
+   public static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+   return SqlTimeTypeInfo.TIMESTAMP;
+   case BINARY:
+   return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+   case STRUCT:
+   List fieldSchemas = 
schema.getChildren();
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldSchemas.size()];
+   for (int i = 0; i < fieldSchemas.size(); i++) {
+ 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178618#comment-16178618
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697150
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -0,0 +1,152 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-orc_${scala.binary.version}
+   flink-orc
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.orc
+   orc-core
+   1.4.0
+   
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
--- End diff --

There should be no need to specify `${project.version}` here as it has been 
specified in the root level pom.xml


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178628#comment-16178628
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699592
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
--- End diff --

The size of the batch needs to be configurable.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178637#comment-16178637
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699738
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
+   private TypeDescription schema;
+   private int[] fieldMapping;
+
+   private transient RowTypeInfo rowType;
+   private transient RecordReader orcRowsReader;
+   private transient VectorizedRowBatch rowBatch;
+   private transient Row[] rows;
+
+   private transient int rowInBatch;
+
+   public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig);
+   }
+
+   public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   super(new Path(path));
+   this.unsplittable = false;
+   this.schema = orcSchema;
+   this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+   this.config = orcConfig;
+
+   this.fieldMapping = new int[this.schema.getChildren().size()];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   this.fieldMapping[i] = i;
+   }
+
+   }
+
+   public void setFieldMapping(int[] fieldMapping) {
+   this.fieldMapping = fieldMapping;
+   // adapt result type
+
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldMapping.length];
+   String[] fieldNames = new String[fieldMapping.length];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
+   fieldNames[i] = 
this.rowType.getFieldNames()[fieldMapping[i]];
+   }
+   this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   private boolean[] computeProjectionMask() {
+   boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+   for (int inIdx : fieldMapping) {
+   TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+   for (int i = 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178623#comment-16178623
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140698379
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
--- End diff --

It makes sense to have a consistent naming of the field.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178629#comment-16178629
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140700317
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,2229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+public class OrcUtils {
+
+   /**
+* Convert ORC schema types to Flink types.
+*
+* @param schema schema of orc file
+*
+*/
+   public static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
--- End diff --

For clarity it can be separated into two steps. You can do something like:

```
TypeInformation ty = TYPE_MAP.get(schema.getCategory())
if (ty != null) {
  return ty;
} else if (schema.getCategory() == ...) {
} ...
```


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178615#comment-16178615
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697199
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -0,0 +1,152 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-orc_${scala.binary.version}
+   flink-orc
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.orc
+   orc-core
+   1.4.0
+   
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   
flink-test-utils_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   test
+   test-jar
+   
+   
+
+   
+
+   
--- End diff --

Looks like unnecessary changes.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178627#comment-16178627
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697889
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
+   private TypeDescription schema;
+   private int[] fieldMapping;
+
+   private transient RowTypeInfo rowType;
+   private transient RecordReader orcRowsReader;
+   private transient VectorizedRowBatch rowBatch;
+   private transient Row[] rows;
+
+   private transient int rowInBatch;
+
+   public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig);
+   }
+
+   public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   super(new Path(path));
+   this.unsplittable = false;
+   this.schema = orcSchema;
+   this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+   this.config = orcConfig;
+
+   this.fieldMapping = new int[this.schema.getChildren().size()];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   this.fieldMapping[i] = i;
+   }
+
+   }
+
+   public void setFieldMapping(int[] fieldMapping) {
+   this.fieldMapping = fieldMapping;
+   // adapt result type
+
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldMapping.length];
+   String[] fieldNames = new String[fieldMapping.length];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
+   fieldNames[i] = 
this.rowType.getFieldNames()[fieldMapping[i]];
+   }
+   this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   private boolean[] computeProjectionMask() {
+   boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+   for (int inIdx : fieldMapping) {
+   TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+   for (int i = 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178617#comment-16178617
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140698914
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,2229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+public class OrcUtils {
--- End diff --

The class can be package-local


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178635#comment-16178635
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699375
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -0,0 +1,152 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-orc_${scala.binary.version}
+   flink-orc
+
+   jar
+
+   
+
+   
+
+   
--- End diff --

`${project.version}` are unnecessary as it has been specified at the root 
level of pom.xml


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178622#comment-16178622
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697367
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
--- End diff --

All of the files can be final.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178630#comment-16178630
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699826
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
--- End diff --

It also makes sense to standardize the naming of the configuration (e.g., 
conf)


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178633#comment-16178633
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699619
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
--- End diff --

Mark fields as final.

s/org.apache.hadoop.conf.//g


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178614#comment-16178614
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697648
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
+   private TypeDescription schema;
+   private int[] fieldMapping;
+
+   private transient RowTypeInfo rowType;
+   private transient RecordReader orcRowsReader;
+   private transient VectorizedRowBatch rowBatch;
+   private transient Row[] rows;
+
+   private transient int rowInBatch;
+
+   public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig);
+   }
+
+   public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   super(new Path(path));
+   this.unsplittable = false;
+   this.schema = orcSchema;
+   this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+   this.config = orcConfig;
+
+   this.fieldMapping = new int[this.schema.getChildren().size()];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   this.fieldMapping[i] = i;
+   }
+
+   }
+
+   public void setFieldMapping(int[] fieldMapping) {
+   this.fieldMapping = fieldMapping;
--- End diff --

This function can be refactored into a common function.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178634#comment-16178634
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140698526
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
--- End diff --

The size of the batch needs to be configurable.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178612#comment-16178612
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697285
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
+   private TypeDescription orcSchema;
+   private RowTypeInfo typeInfo;
+   private Configuration orcConfig;
+   private int[] fieldMapping;
+
+   /**
+* The ORC file path and schema.
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+*/
+   public OrcTableSource(String path, String orcSchema) {
+   this(path, orcSchema, new Configuration());
+   }
+
+   /**
+* The file path and schema of orc file, and configuration to read orc 
file .
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+* @param orcConfig configuration to read orc file
+*/
+   public OrcTableSource(String path, String orcSchema, Configuration 
orcConfig) {
+   this(path, TypeDescription.fromString(orcSchema), orcConfig);
+   }
+
+   public OrcTableSource(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   this.path = path;
+   this.orcSchema = orcSchema;
+   this.orcConfig = orcConfig;
+
+   this.typeInfo = (RowTypeInfo) 
OrcUtils.schemaToTypeInfo(this.orcSchema);
+
+   }
+
+   @Override
+   public DataSet getDataSet(ExecutionEnvironment execEnv) {
+
--- End diff --

nit: additional empty line.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178619#comment-16178619
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140698436
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
--- End diff --

s/org.apache.hadoop.conf.//


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178632#comment-16178632
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140700434
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,2229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+public class OrcUtils {
+
+   /**
+* Convert ORC schema types to Flink types.
+*
+* @param schema schema of orc file
+*
+*/
+   public static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+   return SqlTimeTypeInfo.TIMESTAMP;
+   case BINARY:
+   return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+   case STRUCT:
+   List fieldSchemas = 
schema.getChildren();
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldSchemas.size()];
+   for (int i = 0; i < fieldSchemas.size(); i++) {
+   

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178631#comment-16178631
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699402
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -0,0 +1,152 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-orc_${scala.binary.version}
+   flink-orc
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   compile
+   
+
+   
+   org.apache.orc
+   orc-core
+   1.4.0
+   
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   
flink-test-utils_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   test
+   test-jar
+   
+   
+
+   
+
+   
--- End diff --

Unnecessary changes.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178626#comment-16178626
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699453
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
--- End diff --

Mark the fields as final


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178620#comment-16178620
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140700095
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
+   private TypeDescription schema;
+   private int[] fieldMapping;
+
+   private transient RowTypeInfo rowType;
+   private transient RecordReader orcRowsReader;
+   private transient VectorizedRowBatch rowBatch;
+   private transient Row[] rows;
+
+   private transient int rowInBatch;
+
+   public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig);
+   }
+
+   public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   super(new Path(path));
+   this.unsplittable = false;
+   this.schema = orcSchema;
+   this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+   this.config = orcConfig;
+
+   this.fieldMapping = new int[this.schema.getChildren().size()];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   this.fieldMapping[i] = i;
+   }
+
+   }
+
+   public void setFieldMapping(int[] fieldMapping) {
+   this.fieldMapping = fieldMapping;
+   // adapt result type
+
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldMapping.length];
+   String[] fieldNames = new String[fieldMapping.length];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
+   fieldNames[i] = 
this.rowType.getFieldNames()[fieldMapping[i]];
+   }
+   this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   private boolean[] computeProjectionMask() {
+   boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+   for (int inIdx : fieldMapping) {
+   TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+   for (int i = 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178621#comment-16178621
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699577
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
--- End diff --

Can be a package local class.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178613#comment-16178613
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140698040
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC data.
+ * For Optimization, reading is done in batch instead of a single row.
+ */
+public class RowOrcInputFormat
+   extends FileInputFormat
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
+   private static final int BATCH_SIZE = 1024;
+
+   private org.apache.hadoop.conf.Configuration config;
+   private TypeDescription schema;
+   private int[] fieldMapping;
+
+   private transient RowTypeInfo rowType;
+   private transient RecordReader orcRowsReader;
+   private transient VectorizedRowBatch rowBatch;
+   private transient Row[] rows;
+
+   private transient int rowInBatch;
+
+   public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig);
+   }
+
+   public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   super(new Path(path));
+   this.unsplittable = false;
+   this.schema = orcSchema;
+   this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+   this.config = orcConfig;
+
+   this.fieldMapping = new int[this.schema.getChildren().size()];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   this.fieldMapping[i] = i;
+   }
+
+   }
+
+   public void setFieldMapping(int[] fieldMapping) {
+   this.fieldMapping = fieldMapping;
+   // adapt result type
+
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldMapping.length];
+   String[] fieldNames = new String[fieldMapping.length];
+   for (int i = 0; i < fieldMapping.length; i++) {
+   fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
+   fieldNames[i] = 
this.rowType.getFieldNames()[fieldMapping[i]];
+   }
+   this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   private boolean[] computeProjectionMask() {
+   boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+   for (int inIdx : fieldMapping) {
+   TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+   for (int i = 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178625#comment-16178625
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699558
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
+   private TypeDescription orcSchema;
+   private RowTypeInfo typeInfo;
+   private Configuration orcConfig;
+   private int[] fieldMapping;
+
+   /**
+* The ORC file path and schema.
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+*/
+   public OrcTableSource(String path, String orcSchema) {
+   this(path, orcSchema, new Configuration());
+   }
+
+   /**
+* The file path and schema of orc file, and configuration to read orc 
file .
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+* @param orcConfig configuration to read orc file
+*/
+   public OrcTableSource(String path, String orcSchema, Configuration 
orcConfig) {
+   this(path, TypeDescription.fromString(orcSchema), orcConfig);
+   }
+
+   public OrcTableSource(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+   this.path = path;
+   this.orcSchema = orcSchema;
+   this.orcConfig = orcConfig;
+
+   this.typeInfo = (RowTypeInfo) 
OrcUtils.schemaToTypeInfo(this.orcSchema);
+
+   }
+
+   @Override
+   public DataSet getDataSet(ExecutionEnvironment execEnv) {
+
+   RowOrcInputFormat orcIF = new RowOrcInputFormat(path, 
orcSchema, orcConfig);
+   if (fieldMapping != null) {
+   orcIF.setFieldMapping(fieldMapping);
+   }
+   return execEnv.createInput(orcIF);
+   }
+
+   @Override
+   public TypeInformation getReturnType() {
+   return typeInfo;
+   }
+
+   @Override
+   public TableSource projectFields(int[] fields) {
+
+   OrcTableSource copy = new OrcTableSource(path, orcSchema, 
orcConfig);
+
+   // set field mapping
+   copy.fieldMapping = fields;
+
+   // adapt TypeInfo
--- End diff --

This function can be refactored into a common function.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178636#comment-16178636
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699131
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,2229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+public class OrcUtils {
+
+   /**
+* Convert ORC schema types to Flink types.
+*
+* @param schema schema of orc file
+*
+*/
+   public static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+   return SqlTimeTypeInfo.TIMESTAMP;
+   case BINARY:
+   return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+   case STRUCT:
+   List fieldSchemas = 
schema.getChildren();
+   TypeInformation[] fieldTypes = new 
TypeInformation[fieldSchemas.size()];
+   for (int i = 0; i < fieldSchemas.size(); i++) {
+   

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178616#comment-16178616
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140699511
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
+   private TypeDescription orcSchema;
+   private RowTypeInfo typeInfo;
+   private Configuration orcConfig;
+   private int[] fieldMapping;
+
+   /**
+* The ORC file path and schema.
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+*/
+   public OrcTableSource(String path, String orcSchema) {
--- End diff --

Creating a new instance of `Configuration` is expensive. I would force the 
users to pass it in.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178624#comment-16178624
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4670#discussion_r140697494
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Creates a TableSource to read ORC file.
+ *
+ * The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ *
+ * The OrcTableSource is used as shown in the example below.
+ *
+ * 
+ * {@code
+ * String path = testInputURL.getPath();
+ * String schema = 
"struct"
+ * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * tEnv.registerTableSource("orcTable", orcSrc);
+ * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * }
+ * 
+ */
+public class OrcTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private String path;
+   private TypeDescription orcSchema;
+   private RowTypeInfo typeInfo;
+   private Configuration orcConfig;
+   private int[] fieldMapping;
+
+   /**
+* The ORC file path and schema.
+*
+* @param path  the path of orc file
+* @param orcSchema schema of orc file
+*/
+   public OrcTableSource(String path, String orcSchema) {
--- End diff --

Create a new `Configuration` instance is very expensive. It makes sense to 
force the users to pass it in.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166005#comment-16166005
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

GitHub user uybhatti opened a pull request:

https://github.com/apache/flink/pull/4670

[FLINK-2170] [connectors] Add ORC connector for TableSource

## What is the purpose of the change
Currently, we can't read data from ORC files. In this PR, we added the 
support to load data from ORC files to Table Source.


## Brief change log
  - RowOrcInputFormat, OrcUtils and OrcTableSource classes implement the 
above functionality. Also, OrcTableSource implement the ProjectableTableSource 
and FilterableTableSource interfaces.
  - For Optimisation, reading from ORC file is done in batch instead of 
single row at a time.

## Verifying this change
This change added tests and can be verified as follows:
  - RowOrcInputFormatTest to verify that reading for different datatypes, 
nested data types and projection is correct.
  - OrcTableSourceTest and OrcTableSourceITCase are used to verify that 
loading of ORC data into TableSource is correct.
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **yes**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  **no**
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **not documented**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uybhatti/flink FLINK-2170

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4670.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4670


commit 6629db4daf99960f0251948cb4569eb7c6efbade
Author: Fabian Hueske 
Date:   2017-03-03T22:55:22Z

[FLINK-2170] [connectors] Add ORC connector for TableSource




> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-07-10 Thread Usman Younas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080656#comment-16080656
 ] 

Usman Younas commented on FLINK-2170:
-

Hi [~fhueske] I'd like to work on OrcTableSource and will continue with your 
implementation. Please assign this issue to me.
Thanks, Usman

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Owen O'Malley
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-03-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896903#comment-15896903
 ] 

Fabian Hueske commented on FLINK-2170:
--

Hi [~owen.omalley], I started with an implementation. 
It's still WIP and needs to be extended 
(https://github.com/fhueske/flink/tree/tableORCSource).
Would you be OK if I assign the issue to me? 

Thanks, Fabian


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Owen O'Malley
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-02-03 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851712#comment-15851712
 ] 

Robert Metzger commented on FLINK-2170:
---

[~owen.omalley] What's your status on this?


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Owen O'Malley
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2016-09-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15515828#comment-15515828
 ] 

Fabian Hueske commented on FLINK-2170:
--

FLINK-3848 and FLINK-3849 have not been addressed yet. 
I don't think that somebody is currently working on that.

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Owen O'Malley
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2016-06-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323351#comment-15323351
 ] 

Fabian Hueske commented on FLINK-2170:
--

Hi [~owen.omalley], we haven't added support for pushing filters and projection 
into a {{TableSource}} yet (FLINK-3848, FLINK-3849).
However, we can certainly start with an {{OrcTableSource}} and add these 
features later. 

I'll assign the issue to you. Thanks!

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2016-06-09 Thread Owen O'Malley (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323281#comment-15323281
 ] 

Owen O'Malley commented on FLINK-2170:
--

If no one is working on this, I'd like to work on this.

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Eric Falk
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2016-04-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262468#comment-15262468
 ] 

Fabian Hueske commented on FLINK-2170:
--

Updated the issue title and description to reflect the new {{TableSource}} 
interface.

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Eric Falk
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)