[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-10-24 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/2686

[FLINK-4743] The sqrt/power function not accept the real data types.

Add supporting of variety types on "power" function. Add casting for Float 
and BigDecimal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-4743

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2686


commit e5cfd1106b3c69f599149b56eee2ede7936e1e63
Author: anton_solo...@epam.com 
Date:   2016-10-17T07:44:12Z

[FLINK-4743] Add supporting of variety types on "power" function. Add 
casting for Float and BigDecimal.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-10-28 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2686#discussion_r85514611
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/PowerExpressionTest.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import java.lang.{Double => JDouble, Float => JFloat, Long => JLong, 
Integer => JInteger}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, Types}
+import org.junit.Test
+
+class PowerExpressionTest extends ExpressionTestBase {
+
+  override def testData = {
+val testData = new Row(5)
+testData.setField(0, 100.0f: JFloat)
+testData.setField(1, 100.0: JDouble)
+testData.setField(2, 100L: JLong)
+testData.setField(3, 100: JInteger)
+testData.setField(4, new JBigDecimal(100): JBigDecimal)
+testData
+  }
+
+  override def typeInfo = {
+new RowTypeInfo(Seq(
+  Types.FLOAT,
+  Types.DOUBLE,
+  Types.LONG,
+  Types.INT,
+  Types.DECIMAL)
+).asInstanceOf[TypeInformation[Any]]
+  }
+
+  @Test
+  def testSqrtFloat(): Unit = {
+testSqlApi(
+  "SQRT(f0)",
+  "10.0")
+  }
+
+  @Test
+  def testSqrtDouble(): Unit = {
+testSqlApi(
--- End diff --

Yes, but i found out another bug. with this api 'f1.sqrt() and f1 is 
bigDecimal so test throws ValidationException. I wrote about it in dev mailing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2720: [FLINK-4623] Create Physical Execution Plan of a D...

2016-10-28 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/2720

[FLINK-4623] Create Physical Execution Plan of a DataStream



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-4623

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2720.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2720


commit bdbfe89fbb61b04f3ee6f29b30382bf06983c2e5
Author: anton solovev 
Date:   2016-10-25T11:55:42Z

[FLINK-4623] Implement explain for StreamTableEnvironment. It shows 
Abstract Syntax Tree and Physical Execution Plan.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2686: [FLINK-4743] The sqrt/power function not accept the real ...

2016-10-28 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2686
  
Hi @wuchong 

I agree that  users can cast types manually. And that 
BuiltInMethods.POWER_DEC should be bound power(double, BigDecimal), but it's 
trouble of Calcite https://issues.apache.org/jira/browse/CALCITE-1467?filter=-2



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-10-28 Thread tonycox
Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/2686


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-10-28 Thread tonycox
GitHub user tonycox reopened a pull request:

https://github.com/apache/flink/pull/2686

[FLINK-4743] The sqrt/power function not accept the real data types.

Add supporting of variety types on "power" function. Add casting for Float 
and BigDecimal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-4743

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2686


commit e5cfd1106b3c69f599149b56eee2ede7936e1e63
Author: anton_solo...@epam.com 
Date:   2016-10-17T07:44:12Z

[FLINK-4743] Add supporting of variety types on "power" function. Add 
casting for Float and BigDecimal.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2686: [FLINK-4743] The sqrt/power function not accept the real ...

2016-10-31 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2686
  
Hi @wuchong , native java implementation of "power" function suits to most 
cases. So i deleted POWER_DEC from flink, because it has no proper types. 
Actually we don't make user's life harder. Instead of this I think the best way 
to solve this issue is to do what Fabian said https://goo.gl/hjtlOd, just cast 
bigDecimal in table api manually. And tell in exception message that they 
should cast. Tests of sqrt and power with all types are good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2686: [FLINK-4743] The sqrt/power function not accept the real ...

2016-10-31 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2686
  
@wuchong if choosing second option, why not to solve this issue 
https://issues.apache.org/jira/browse/CALCITE-1468 instead of solve on flink 
side ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2686: [FLINK-4743] The sqrt/power function not accept the real ...

2016-11-01 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2686
  
I agree with @wuchong that we need create a new issue in JIRA for 
supporting `BigDecimal` in functions (`log`,`ln`,`exp` etc), but I suppose that 
if problem in calcite with `DECIMAL` is done we will not repeat calcit's work 
in flink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2686: [FLINK-4743] The sqrt/power function not accept the real ...

2016-11-01 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2686
  
@wuchong, second option supports only in SQL. in table api we still cast 
manually.
in first option where should I create power(double, BigDecimal) function, 
which package or util class?
@fhueske, explicit cast of bigDecimal should be everywhere, and we don't 
need `PowerCallGen` anymore ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-11-01 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2686#discussion_r86081029
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/functions/utils/MathFunctions.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils;
+
+import java.math.BigDecimal;
+
+public class MathFunctions {
+
+   public static double power(double a, BigDecimal b) {
+   return Math.pow(a, b.doubleValue());
+   }
+
+}
--- End diff --

We can't call Scala from generated Java code because of Calcit's compilation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-11-02 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2686#discussion_r86090398
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/functions/utils/MathFunctions.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils;
+
+import java.math.BigDecimal;
+
+public class MathFunctions {
+
+   public static double power(double a, BigDecimal b) {
+   return Math.pow(a, b.doubleValue());
+   }
+
+}
--- End diff --

i just implemented it into BuiltInMethods to see 
https://github.com/tonycox/flink/blob/4ebfd81e04d94161827f7bd55ee1146eedafeeb0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2686: [FLINK-4743] The sqrt/power function not accept th...

2016-11-02 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2686#discussion_r86108721
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/functions/utils/MathFunctions.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils;
+
+import java.math.BigDecimal;
+
+public class MathFunctions {
+
+   public static double power(double a, BigDecimal b) {
+   return Math.pow(a, b.doubleValue());
+   }
+
+}
--- End diff --

Why I didn't write `power` function in scala class.

I plunged into a problem with static access to method. I tried rewrite 
`MathFunctions.java` into Scala object https://goo.gl/rmLy8f . 
In BuiltInMethods we cannot to use `classOf[]` with scala `object`. So I 
used `getClass()`
Test throws [exception](http://pastebin.com/fLNr4CfX) 
because `getClass()` doesn't return `MathFunctions` itself, it returns 
`MathFunctions$` http://pastebin.com/LiZk45Fe 
**But** there is a solving :)
```
val POWER_DEC =

Types.lookupMethod(Class.forName(MathFunctions.getClass.getCanonicalName.dropRight(1)),
"power", classOf[Double], classOf[JBigDecimal])
```
I prefer java approach, what do you think @wuchong @fhueske ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-15 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/2810

[FLINK-3848] Add ProjectableTableSource interface and translation rule.

Extend CsvTableSource to implement projection

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-3848

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2810


commit 95f55a3783ff849357b58d06f3b8c99b20edbe5e
Author: tonycox 
Date:   2016-11-07T16:18:31Z

[FLINK-3848] Add ProjectableTableSource interface and translation rule. 
Extend CsvTableSource to implement projection




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-20 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88841542
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

@wuchong, actually, `projection` works right after `tableSourceScan`. it 
just pushes `fieldMask` into `source` and prunes columns for logic plan. So we 
don't need to add any type convertions 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-21 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88849756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

So it will turns into scan and will takes first step in logic plan, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-11-22 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
> In addition, the TableSource should know project information (including 
order) not just fieldIncluded. So maybe we should also adapt RowCsvInputFormat.

Do you mean we need to shuffle field in row according to projection while 
scanning a file? Why not to map row fields after that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-11-22 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
@wuchong I added noticed items


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-11-25 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
@wuchong could you review my changes?
cc @fhueske @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-11-28 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @wuchong , thank you for the review. 
Do you mean only for `Stream`?
And why we shouldn't to override computeSelfCost for DataStreamRel ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-29 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r90010696
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
+import org.apache.calcite.rex.RexInputRef
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+/** Rule to convert a [[LogicalTableScan]] with [[LogicalProject]]
+  * into a [[BatchProjectableTableSourceScan]].
+  */
+class BatchProjectableTableSourceScanRule
+  extends RelOptRule(
+operand(classOf[LogicalProject], operand(classOf[TableScan], none())),
+"BatchProjectableTableSourceScanRule") {
+
+  /** Rule must only match if TableScan targets a [[BatchTableSource]],
+* LogicalProject targets a [[ProjectableTableSource]] and all operands 
are [[RexInputRef]]
+*/
+  override def matches(call: RelOptRuleCall): Boolean = {
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: BatchTableSource[_] =>
+s match {
+  case p: ProjectableTableSource[_] =>
+
project.getProjects.asScala.forall(_.isInstanceOf[RexInputRef])
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+val newRel = new BatchProjectableTableSourceScan(
--- End diff --

How can I adapted RexProgram in the same Rule? 
use `transformTo()` for `BatchProjectableTableSourceScan` and then for 
custom `LogicalCalc` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @fhueske , I see conflicts PR check, should I rebase new commits, or 
merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
@wuchong, @fhueske, question about field shuffling, should I shuffle it in 
RowCsvInputFormat by setting an order to scan, or let LogicalCal do it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
I see. @fhueske. Set an order of scaning is much better way. 
I have problem when plan is choosing. Best expression is not correct. Rules 
work, but planner decides to choose `MapFunction` anyway, without projection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-06 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @fhueske, your PR do solve my problem. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-06 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
@fhueske Basically yes, but I think we need finish projection rule for 
Stream, after FLINK-5251 merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-07 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @fhueske and @KurtYoung I totally agree with the plan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-08 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
#2926 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91548803
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator {
+
+   private static final long serialVersionUID = 1L;
+   /** The number of fields of the Row */
+   private final int arity;
+   /** key positions describe which fields are keys in what order */
+   private final int[] keyPositions;
+   /** null-aware comparators for the key fields, in the same order as the 
key fields */
+   private final NullAwareComparator[] comparators;
+   /** serializers to deserialize the first n fields for comparison */
+   private final TypeSerializer[] serializers;
+   /** auxiliary fields for normalized key support */
+   private final int[] normalizedKeyLengths;
+   private final int numLeadingNormalizableKeys;
+   private final int normalizableKeyPrefixLen;
+   private final boolean invertNormKey;
+
+   // null masks for serialized comparison
+   private final boolean[] nullMask1;
+   private final boolean[] nullMask2;
+
+   // cache for the deserialized key field objects
+   transient private final Object[] deserializedKeyFields1;
+   transient private final Object[] deserializedKeyFields2;
+
+   /**
+* General constructor for RowComparator.
+*
+* @param aritythe number of fields of the Row
+* @param keyPositions key positions describe which fields are keys in 
what order
+* @param comparators  non-null-aware comparators for the key fields, 
in the same order as
+* the key fields
+* @param serializers  serializers to deserialize the first n fields 
for comparison
+* @param orders   sorting orders for the fields
+*/
+   public RowComparator(
+   int arity,
+   int[] keyPositions,
+   TypeComparator[] comparators,
+   TypeSerializer[] serializers,
+   boolean[] orders) {
+   this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
+   }
+
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComparator[] comparators,
+   TypeSerializer[] serializers) {
+   this(
+   arity,
+   keyPositions,
+   comparators,
+   serializers,
+   createAuxiliaryFields(keyPositions, comparators));
+   }
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComp

[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91544114
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase {
+
+   private static final long serialVersionUID = 9158518989896601963L;
+
+   protected final String[] fieldNames;
+   /** Temporary variable for directly passing orders to comparators. */
+   private boolean[] comparatorOrders = null;
+
+   public RowTypeInfo(TypeInformation... types) {
+   super(Row.class, types);
+
+   this.fieldNames = new String[types.length];
+
+   for (int i = 0; i < types.length; i++) {
+   fieldNames[i] = "f" + i;
+   }
+   }
+
+   @Override
+   public TypeComparator createComparator(
+   int[] logicalKeyFields,
+   boolean[] orders,
+   int logicalFieldOffset,
+   ExecutionConfig config) {
+   comparatorOrders = orders;
+   TypeComparator comparator = super.createComparator(
+   logicalKeyFields,
+   orders,
+   logicalFieldOffset,
+   config);
+   comparatorOrders = null;
+   return comparator;
+   }
+
+   @Override
+   protected TypeComparatorBuilder createTypeComparatorBuilder() {
+   if (comparatorOrders == null) {
+   throw new IllegalStateException("Cannot create 
comparator builder without orders.");
+   }
+   return new RowTypeComparatorBuilder(comparatorOrders);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public int getFieldIndex(String fieldName) {
+   for (int i = 0; i < fieldNames.length; i++) {
+   if (fieldNames[i].equals(fieldName)) {
+   return i;
--- End diff --

what if we have more than field one with the same name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91550465
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row has no limited length and contain a set of fields, which may all 
be different types.
+ * Because Row is not strongly typed, Flink's type extraction mechanism 
can't extract correct field
+ * types. So that users should manually tell Flink the type information 
via creating a
+ * {@link RowTypeInfo}.
+ *
+ * 
+ * The fields in the Row may be accessed by position (zero-based) {@link 
#getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * 
+ * Row is in principle serializable. However, it may contain 
non-serializable fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+   private static final long serialVersionUID = 1L;
+
+   /** Number of field. */
+   private final int arity;
--- End diff --

`fields.length` is similar to `arity`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91551621
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator {
+
+   private static final long serialVersionUID = 1L;
+   /** The number of fields of the Row */
+   private final int arity;
+   /** key positions describe which fields are keys in what order */
+   private final int[] keyPositions;
+   /** null-aware comparators for the key fields, in the same order as the 
key fields */
+   private final NullAwareComparator[] comparators;
+   /** serializers to deserialize the first n fields for comparison */
+   private final TypeSerializer[] serializers;
+   /** auxiliary fields for normalized key support */
+   private final int[] normalizedKeyLengths;
+   private final int numLeadingNormalizableKeys;
+   private final int normalizableKeyPrefixLen;
+   private final boolean invertNormKey;
+
+   // null masks for serialized comparison
+   private final boolean[] nullMask1;
+   private final boolean[] nullMask2;
+
+   // cache for the deserialized key field objects
+   transient private final Object[] deserializedKeyFields1;
+   transient private final Object[] deserializedKeyFields2;
+
+   /**
+* General constructor for RowComparator.
+*
+* @param aritythe number of fields of the Row
+* @param keyPositions key positions describe which fields are keys in 
what order
+* @param comparators  non-null-aware comparators for the key fields, 
in the same order as
+* the key fields
+* @param serializers  serializers to deserialize the first n fields 
for comparison
+* @param orders   sorting orders for the fields
+*/
+   public RowComparator(
+   int arity,
+   int[] keyPositions,
+   TypeComparator[] comparators,
+   TypeSerializer[] serializers,
+   boolean[] orders) {
+   this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
+   }
+
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComparator[] comparators,
+   TypeSerializer[] serializers) {
+   this(
+   arity,
+   keyPositions,
+   comparators,
+   serializers,
+   createAuxiliaryFields(keyPositions, comparators));
+   }
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComp

[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91547968
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator {
+
+   private static final long serialVersionUID = 1L;
+   /** The number of fields of the Row */
+   private final int arity;
+   /** key positions describe which fields are keys in what order */
+   private final int[] keyPositions;
+   /** null-aware comparators for the key fields, in the same order as the 
key fields */
+   private final NullAwareComparator[] comparators;
+   /** serializers to deserialize the first n fields for comparison */
+   private final TypeSerializer[] serializers;
+   /** auxiliary fields for normalized key support */
+   private final int[] normalizedKeyLengths;
+   private final int numLeadingNormalizableKeys;
+   private final int normalizableKeyPrefixLen;
+   private final boolean invertNormKey;
+
+   // null masks for serialized comparison
+   private final boolean[] nullMask1;
+   private final boolean[] nullMask2;
+
+   // cache for the deserialized key field objects
+   transient private final Object[] deserializedKeyFields1;
+   transient private final Object[] deserializedKeyFields2;
+
+   /**
+* General constructor for RowComparator.
+*
+* @param aritythe number of fields of the Row
+* @param keyPositions key positions describe which fields are keys in 
what order
+* @param comparators  non-null-aware comparators for the key fields, 
in the same order as
+* the key fields
+* @param serializers  serializers to deserialize the first n fields 
for comparison
+* @param orders   sorting orders for the fields
+*/
+   public RowComparator(
+   int arity,
+   int[] keyPositions,
+   TypeComparator[] comparators,
+   TypeSerializer[] serializers,
+   boolean[] orders) {
+   this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
+   }
+
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComparator[] comparators,
+   TypeSerializer[] serializers) {
+   this(
+   arity,
+   keyPositions,
+   comparators,
+   serializers,
+   createAuxiliaryFields(keyPositions, comparators));
+   }
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComp

[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91547877
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator {
+
+   private static final long serialVersionUID = 1L;
+   /** The number of fields of the Row */
+   private final int arity;
+   /** key positions describe which fields are keys in what order */
+   private final int[] keyPositions;
+   /** null-aware comparators for the key fields, in the same order as the 
key fields */
+   private final NullAwareComparator[] comparators;
+   /** serializers to deserialize the first n fields for comparison */
+   private final TypeSerializer[] serializers;
+   /** auxiliary fields for normalized key support */
+   private final int[] normalizedKeyLengths;
+   private final int numLeadingNormalizableKeys;
+   private final int normalizableKeyPrefixLen;
+   private final boolean invertNormKey;
+
+   // null masks for serialized comparison
+   private final boolean[] nullMask1;
+   private final boolean[] nullMask2;
+
+   // cache for the deserialized key field objects
+   transient private final Object[] deserializedKeyFields1;
+   transient private final Object[] deserializedKeyFields2;
+
+   /**
+* General constructor for RowComparator.
+*
+* @param aritythe number of fields of the Row
+* @param keyPositions key positions describe which fields are keys in 
what order
+* @param comparators  non-null-aware comparators for the key fields, 
in the same order as
+* the key fields
+* @param serializers  serializers to deserialize the first n fields 
for comparison
+* @param orders   sorting orders for the fields
+*/
+   public RowComparator(
+   int arity,
+   int[] keyPositions,
+   TypeComparator[] comparators,
+   TypeSerializer[] serializers,
+   boolean[] orders) {
+   this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
+   }
+
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComparator[] comparators,
+   TypeSerializer[] serializers) {
+   this(
+   arity,
+   keyPositions,
+   comparators,
+   serializers,
+   createAuxiliaryFields(keyPositions, comparators));
+   }
+
+   /**
+* Intermediate constructor for creating auxiliary fields.
+*/
+   private RowComparator(
+   int arity,
+   int[] keyPositions,
+   NullAwareComp

[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-08 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91546359
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase {
+
+   private static final long serialVersionUID = 9158518989896601963L;
+
+   protected final String[] fieldNames;
+   /** Temporary variable for directly passing orders to comparators. */
+   private boolean[] comparatorOrders = null;
+
+   public RowTypeInfo(TypeInformation... types) {
+   super(Row.class, types);
+
+   this.fieldNames = new String[types.length];
+
+   for (int i = 0; i < types.length; i++) {
+   fieldNames[i] = "f" + i;
--- End diff --

Can we somehow get real names of fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2923: [FLINK-5220] [Table API & SQL] Flink SQL projection pushd...

2016-12-08 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2923
  
Hi @fhueske, this PR looks good to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-09 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91670483
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -326,8 +326,8 @@ public int getNormalizeKeyLen() {
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return numLeadingNormalizableKeys < keyPositions.length ||
-   normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-   normalizableKeyPrefixLen > keyBytes;
+  normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+  normalizableKeyPrefixLen > keyBytes;
--- End diff --

here must be only tabs instead of spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-09 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91674126
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase {
+
+   private static final long serialVersionUID = 9158518989896601963L;
+
+   protected final String[] fieldNames;
+   /** Temporary variable for directly passing orders to comparators. */
+   private boolean[] comparatorOrders = null;
+
+   public RowTypeInfo(TypeInformation... types) {
--- End diff --

Please add another one constructor with `Collection> 
types` to set for example `Seq(Types.INT, Types.LONG, Types.STRING)` from scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-09 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91704437
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase {
+
+   private static final long serialVersionUID = 9158518989896601963L;
+
+   protected final String[] fieldNames;
+   /** Temporary variable for directly passing orders to comparators. */
+   private boolean[] comparatorOrders = null;
+
+   public RowTypeInfo(TypeInformation... types) {
--- End diff --

Scala `scala.collection.Seq` easily converts to `java.util.List` with 
`import scala.collection.JavaConversions._` in scala code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-09 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91721721
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase {
+
+   private static final long serialVersionUID = 9158518989896601963L;
+
+   protected final String[] fieldNames;
+   /** Temporary variable for directly passing orders to comparators. */
+   private boolean[] comparatorOrders = null;
+
+   public RowTypeInfo(TypeInformation... types) {
--- End diff --

And when we set `Array[TypeInformation[_]]` in constructor from scala it 
has conflict with `TypeInformation...`, so I propose cast it `.toList `.
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-09 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91748535
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row has no limited length and contain a set of fields, which may all 
be different types.
+ * Because Row is not strongly typed, Flink's type extraction mechanism 
can't extract correct field
+ * types. So that users should manually tell Flink the type information 
via creating a
+ * {@link RowTypeInfo}.
+ *
+ * 
+ * The fields in the Row may be accessed by position (zero-based) {@link 
#getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * 
+ * Row is in principle serializable. However, it may contain 
non-serializable fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+   private static final long serialVersionUID = 1L;
+
+   /** The array to store actual values. */
+   private final Object[] fields;
+
+   /**
+* Create a new Row instance.
+* @param arity The number of field in the Row
+*/
+   public Row(int arity) {
+   this.fields = new Object[arity];
+   }
+
+   /**
+* Get the number of field in the Row.
+* @return The number of field in the Row.
+*/
+   public int getArity() {
+   return fields.length;
+   }
+
+   /**
+* Gets the field at the specified position.
+* @param pos The position of the field, 0-based.
+* @return The field at the specified position.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+*/
+   public Object getField(int pos) {
+   return fields[pos];
+   }
+
+   /**
+* Sets the field at the specified position.
+*
+* @param pos The position of the field, 0-based.
+* @param value The value to be assigned to the field at the specified 
position.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+*/
+   public void setField(int pos, Object value) {
+   fields[pos] = value;
+   }
+
+   @Override
+   public String toString() {
+   return Arrays.deepToString(fields);
--- End diff --

What doy think about reduce all '[' and ']' from string?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2968: [FLINK-5187] [core] Create analog of Row and RowTypeInfo ...

2016-12-12 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2968
  
Hi @wuchong, maybe we should move `RowCsvInputFormat` also


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-12 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r91941722
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row has no limited length and contain a set of fields, which may all 
be different types.
+ * Because Row is not strongly typed, Flink's type extraction mechanism 
can't extract correct field
+ * types. So that users should manually tell Flink the type information 
via creating a
+ * {@link RowTypeInfo}.
+ *
+ * 
+ * The fields in the Row may be accessed by position (zero-based) {@link 
#getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * 
+ * Row is in principle serializable. However, it may contain 
non-serializable fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+   private static final long serialVersionUID = 1L;
+
+   /** The array to store actual values. */
+   private final Object[] fields;
+
+   /**
+* Create a new Row instance.
+* @param arity The number of field in the Row
+*/
+   public Row(int arity) {
+   this.fields = new Object[arity];
+   }
+
+   /**
+* Get the number of field in the Row.
+* @return The number of field in the Row.
+*/
+   public int getArity() {
+   return fields.length;
+   }
+
+   /**
+* Gets the field at the specified position.
+* @param pos The position of the field, 0-based.
+* @return The field at the specified position.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+*/
+   public Object getField(int pos) {
+   return fields[pos];
+   }
+
+   /**
+* Sets the field at the specified position.
+*
+* @param pos The position of the field, 0-based.
+* @param value The value to be assigned to the field at the specified 
position.
+* @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+*/
+   public void setField(int pos, Object value) {
+   fields[pos] = value;
+   }
+
+   @Override
+   public String toString() {
+   return Arrays.deepToString(fields);
--- End diff --

For example `org.apache.flink.api.scala.stream.table.AggregationsITCase` or 
`org.apache.flink.api.scala.batch.table.JoinITCase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2968: [FLINK-5187] [core] Create analog of Row and RowTypeInfo ...

2016-12-12 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2968
  
@fhueske Yes. I already  started 
https://github.com/tonycox/flink/tree/FLINK-5188


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-13 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @fhueske , what should I finish first, this PR or FLINK-5188 ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3003: [Flink-5188] Create analog of RowCsvInputFormat in...

2016-12-13 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3003

[Flink-5188] Create analog of RowCsvInputFormat in java and adjust all the 
imports of Row and RowTypeInfo

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-5188

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3003.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3003


commit 6b3c98c43965d2adbed20019cf0e22b512ee7744
Author: Jark Wu 
Date:   2016-12-08T14:44:29Z

[FLINK-5187] [core] Create analog of Row and RowTypeInfo and RowComparator 
in core

commit 8da708eaac266b221b6146b4e32600c0fc118a75
Author: Jark Wu 
Date:   2016-12-09T03:43:09Z

address review comments and add RowTypeInfoTest

commit 83bbf7682b4577a320b3fd6227f17e47b73e35fe
Author: Jark Wu 
Date:   2016-12-09T11:29:58Z

fix tabs

commit 739af6821fe796f2d2dd17d2fcacc47ad1fa034b
Author: Jark Wu 
Date:   2016-12-12T14:48:38Z

address review comments

commit 64270234172fecdf0e7fe18c1d80463a6bb0572c
Author: Jark Wu 
Date:   2016-12-13T05:36:06Z

remove unused ArrayList import

commit 9187914e3a34ccfe43a963a4bff8080225457eb9
Author: tonycox 
Date:   2016-12-09T17:41:36Z

[FLINK-5188] Adjust Row and RowTypeInfo dependencies

commit 55dffb6c3c882dbf720a13b0f7ba850d5aa51cf7
Author: tonycox 
Date:   2016-12-13T12:05:58Z

Move RowCsvInputFormat to java.io




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3004: [Flink-5189] Delete Row and its related classes fr...

2016-12-14 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3004

[Flink-5189] Delete Row and its related classes from table api

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-5189

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3004.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3004


commit 6b3c98c43965d2adbed20019cf0e22b512ee7744
Author: Jark Wu 
Date:   2016-12-08T14:44:29Z

[FLINK-5187] [core] Create analog of Row and RowTypeInfo and RowComparator 
in core

commit 8da708eaac266b221b6146b4e32600c0fc118a75
Author: Jark Wu 
Date:   2016-12-09T03:43:09Z

address review comments and add RowTypeInfoTest

commit 83bbf7682b4577a320b3fd6227f17e47b73e35fe
Author: Jark Wu 
Date:   2016-12-09T11:29:58Z

fix tabs

commit 739af6821fe796f2d2dd17d2fcacc47ad1fa034b
Author: Jark Wu 
Date:   2016-12-12T14:48:38Z

address review comments

commit 64270234172fecdf0e7fe18c1d80463a6bb0572c
Author: Jark Wu 
Date:   2016-12-13T05:36:06Z

remove unused ArrayList import

commit 3317c2bdfa6275c121e151d22080fe9920f8075c
Author: tonycox 
Date:   2016-12-09T17:41:36Z

[FLINK-5188] Create analog of RowCsvInputFormat in java and adjust Row and 
RowTypeInfo imports

commit 9f60741d03b4ac6d84c79ec3f7b76fe74e8fa975
Author: tonycox 
Date:   2016-12-14T08:12:06Z

[FLINK-5189] Delete Row and its related classes from table api




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3003: [FLINK-5188] Create analog of RowCsvInputFormat in java a...

2016-12-14 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3003
  
Hi @fhueske okay, as you wish


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2926: [FLINK-5226] [table] Use correct DataSetCostFactory and i...

2016-12-14 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2926
  
@fhueske is there any sense to createcost optimization for Stream? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...

2016-12-15 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3012

[FLINK-2186] Add readCsvAsRow methods to CsvReader and scala ExecutionEnv

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

Rework CSV import to support very wide files

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink FLINK-2186

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3012


commit 905e1fe5f530bcec92af3d4e3ebc8f2c0e26cdf9
Author: tonycox 
Date:   2016-12-12T11:51:56Z

[FLINK-2186] Add readCsvAsRow methods to CsvReader and scala ExecutionEnv.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-15 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
@fhueske I update this PR according to last changes in master 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3018: [FLINK-5336] Remove IOReadableWritable interface f...

2016-12-16 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3018

[FLINK-5336] Remove IOReadableWritable interface from Path

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

make Path's uri final

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink immutablePath

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3018


commit cd20c83edb52bf783850a136d0d57a5b111df5bf
Author: tonycox 
Date:   2016-12-16T09:20:53Z

[FLINK-5336] Remove IOReadableWritable interface from Path, make Path's uri 
final




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3020: [FLINK-5348] [core] Support custom field names for RowTyp...

2016-12-16 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3020
  
Hi @wuchong , in this case I think you should adapt `getFieldIndex`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3020: [FLINK-5348] [core] Support custom field names for RowTyp...

2016-12-16 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3020
  
@wuchong sorry, my fault, didn't spot duplicate field names checkers. What 
about cases, Is flink-table case-sensitive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3027: [FLINK-5358] add RowTypeInfo exctraction in TypeEx...

2016-12-19 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3027

[FLINK-5358] add RowTypeInfo exctraction in TypeExtractor

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink rowTypeExtractor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3027


commit 7aa9992e681761e255f3a24492f202354e77ab2e
Author: tonycox 
Date:   2016-12-16T16:55:40Z

[FLINK-5358] add RowTypeInfo exctraction in TypeExtractor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader and sc...

2016-12-20 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3012
  
Hi @StephanEwen could you look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3027: [FLINK-5358] add RowTypeInfo exctraction in TypeExtractor

2016-12-22 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3027
  
Hi @fhueske could you look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3027: [FLINK-5358] add RowTypeInfo exctraction in TypeEx...

2016-12-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3027#discussion_r93848593
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 ---
@@ -345,8 +346,25 @@ public CustomType cross(CustomType first, Integer 
second) throws Exception {
 

Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class)
 instanceof PojoTypeInfo);
}
-   
 
+   @Test
+   public void testRow() {
--- End diff --

in different test method? 
there is already Row with a null field check

`Row nullRow = new Row(2);
TypeInformation genericRowInfo = TypeExtractor.getForObject(nullRow);
Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class));`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3044: [FLINK-5388] Change private Graph.() to prot...

2016-12-26 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3044

[FLINK-5388] Change private Graph.() to protected

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink privateGellyGraph

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3044


commit ab47e060a2587cb9dec93d6d4e837ed0c61a0751
Author: tonycox 
Date:   2016-12-26T09:10:43Z

change Graph. to protected




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3018: [FLINK-5336] Remove IOReadableWritable interface f...

2017-01-08 Thread tonycox
Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/3018


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-06 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3127
  
@StephanEwen @twalthr What do you think about this PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3040: [FLINK-3850] Add forward field annotations to DataSet

2017-03-06 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3040
  
@fhueske @KurtYoung @twalthr What do you think about this PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3297: [FLINK-5431] Add configurable timePattern for client akka...

2017-03-06 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3297
  
@zentol What do you think about this PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3166: [FLINK-3849] Add FilterableTableSource interface and Rule...

2017-03-09 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3166
  
Hi @fhueske I cant continue on this PR, have not enough time for now. If 
you need implementation of it immediately I will unassign


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-13 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105600606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
--- End diff --

@fhueske I think

[GitHub] flink issue #3040: [FLINK-3850] Add forward field annotations to DataSet

2017-03-13 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3040
  
@fhueske I fixed commented points.
Could you explain the join keys forwarding?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-14 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972317
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,34 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val DATE = JTypes.DATE
+  val TIME = JTypes.TIME
+  val TIMESTAMP = JTypes.TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  def ROW(types: TypeInformation[_]*) = JTypes.ROW(types: _*)
+
+  def ROW(fieldNames: Array[String], types: TypeInformation[_]*) =
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-14 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972293
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class Types {
+
+   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
+   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
+   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
+   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
+   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+
+   public static final SqlTimeTypeInfo DATE = SqlTimeTypeInfo.DATE;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3297: [FLINK-5431] Add configurable timePattern for clie...

2017-03-15 Thread tonycox
Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/3297


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3269: [FLINK-5698] Add NestedFieldsProjectableTableSourc...

2017-03-15 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3269#discussion_r106208275
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramProjectExtractor.scala
 ---
@@ -84,6 +108,49 @@ object RexProgramProjectExtractor {
 }
 
 /**
+  * A RexVisitor to extract used nested input fields
+  */
+class RefFieldAccessorVisitor(
+names: List[String],
+usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
+
+  private val projectedFields = new util.ArrayList[Array[String]]
+
+  names.foreach { n =>
+projectedFields.add(Array.empty)
+  }
+
+  private val order: Map[Int, Int] = 
names.indices.zip(usedFields).map(_.swap).toMap
--- End diff --

exactly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

2017-03-15 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3269
  
Hi @fhueske I've addressed all comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-15 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3127
  
Hi @StephanEwen 
I added javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

2017-03-17 Thread tonycox
Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/3166


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106647040
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

You are right. Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106668425
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

compilation error will come up if we don't use it
```
Error:(62, 7) double definition:
method ROW:(fields: (String, 
org.apache.flink.api.common.typeinfo.TypeInformation[_])*)org.apache.flink.api.java.typeutils.RowTypeInfo
 and
method ROW:(types: 
org.apache.flink.api.common.typeinfo.TypeInformation[_]*)org.apache.flink.api.java.typeutils.RowTypeInfo
 at line 52
have same type after erasure: (fields: 
Seq)org.apache.flink.api.java.typeutils.RowTypeInfo
  def ROW(fields: (String, TypeInformation[_])*) = {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-20 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106862331
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

I think it's good to keep same name, but I can rename as you suggest


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

2017-03-23 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3269
  
@fhueske What dou you think about this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-23 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3127
  
@StephanEwen I fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-25 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3760
  
@ramkrish86 Hi, Thank you for the PR. Could you repush commit?
```
git commit --amend
gut push origin FLINK-5752 -f
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection

2017-01-16 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3127

[FLINK-5481] Add type extraction from collection

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink fromRowCollection

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3127


commit 9331999e2ff3b497f4414342e0602b0ea682c450
Author: tonycox 
Date:   2017-01-13T17:08:47Z

[FLINK-5481] Add type extraction from collection




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #243: [FLINK-1293] Add support for out-of-place aggregations

2017-01-16 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/243
  
Is this PR still alive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #243: [FLINK-1293] Add support for out-of-place aggregations

2017-01-16 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/243
  
Ok, cool. I can go through abandoned PRs and add them to 
https://issues.apache.org/jira/browse/FLINK-5384. Wouldn't you mind @fhueske ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Add type extraction from collection

2017-01-17 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3127
  
@zentol I rewrite as you pointed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
You need to recompile `TableSource` trait manually and implement 
`DefinedFieldNames` in `HBaseTableSource`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
@ramkrish86 @fhueske what do you think about to throw `Tuple` (`T extends 
Tuple`)  out of `org.apache.flink.addons.hbase.TableInputFormat` and implement 
this abstract class in your `HBaseTableSourceInputFormat` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
I think it can be solved in different issue to provide a new api in 
`table.api` for selecting from HBase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
And could you extend with `StreamTableSource` also ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96665755
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import scala.tools.cmd.gen.AnyVals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @Test
+   public void testHBaseTableSource() throws Exception {
+   // create a table with single region
+   TableName tableName = TableName.valueOf("test");
+   createTable(tableName, F_1, new byte[1][]);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(102));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19993l));
--- End diff --

same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96665689
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import scala.tools.cmd.gen.AnyVals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @Test
+   public void testHBaseTableSource() throws Exception {
+   // create a table with single region
+   TableName tableName = TableName.valueOf("test");
+   createTable(tableName, F_1, new byte[1][]);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19991l));
--- End diff --

I think there should be `Q_3`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96665731
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import scala.tools.cmd.gen.AnyVals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @Test
+   public void testHBaseTableSource() throws Exception {
+   // create a table with single region
+   TableName tableName = TableName.valueOf("test");
+   createTable(tableName, F_1, new byte[1][]);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19992l));
--- End diff --

same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96668950
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.conf == null) {
+  

[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
We need discuss that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r9157
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import scala.tools.cmd.gen.AnyVals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @Test
+   public void testHBaseTableSource() throws Exception {
+   // create a table with single region
+   TableName tableName = TableName.valueOf("test");
+   createTable(tableName, F_1, new byte[1][]);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(102));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_2, Bytes.toBytes(19993l));
+   puts.add(put);
+   // add the mutations to the table
+   table.put(puts);
+   table.close();
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment

[GitHub] flink pull request #3166: [FLINK-3849] [WIP] Add FilterableTableSource inter...

2017-01-18 Thread tonycox
GitHub user tonycox opened a pull request:

https://github.com/apache/flink/pull/3166

[FLINK-3849] [WIP] Add FilterableTableSource interface and Rules for 
pushing it

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

I hove some questions:
1) how to mock `TableScan` and `RelBuilder` in 
`#testRewriteRexProgramWithCondition`
2) how to show filter predicate in `#explainTerms`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tonycox/flink filterableSource

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3166


commit 73448a0425cb53f6b9db0bef5200537e2703a8b0
Author: tonycox 
Date:   2017-01-13T17:08:47Z

[FLINK-5481] Add type extraction from collection

commit 0d7e86f6e37ded863f2b930e2d8ebba9a8fc1c07
Author: tonycox 
Date:   2017-01-17T15:31:15Z

search types by columns

commit 4f38c69c1a307d1910f8c4ad811fb32509248880
Author: tonycox 
Date:   2017-01-11T09:15:49Z

[FLINK-3849] Add FilterableTableSource interface and Rules for pushing it




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96802576
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.conf == null) {
+  

[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/3149
  
As Jark Wu said in [jira](https://issues.apache.org/jira/browse/FLINK-5554) 

> I think the HBaseTableSource should return a composite type (with column 
family and qualifier), and we can get columns by composite type accessing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-20 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97067569
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

Try to use `.sql("SELECT test.f1.q1, test.f1.q2t")` or table api instead. I 
think there is a problem with nested Rows while scaning


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-20 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97074517
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

` f0~fn-1 `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97904966
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, By

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97900030
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>>> 
familyMap =
+   new HashMap>>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

I think, if it's used only in table api would be good java.sql.Date
cause calcite use sql type of date


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >