[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3594


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111953023
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111931445
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

+1, will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111930527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

OK, will rename the packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111919363
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Yes, you are right. DataStream can only do `UNION ALL`. 
We should prevent the translation of `UNION` and also change the 
explainString to `union all`, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

I see, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111918681
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

I'd like to keep the package name at `org.apache.flink.table.plan.nodes` to 
avoid another refactoring, but `logical` sounds better than `flinklogical`, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111917910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

I see, thanks for the explanation. I agree, we need to keep the restriction 
here, to push the logical plan in the right direction. 

We might need different sets of logical optimization rules for batch and 
streaming at some point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861591
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

I will remove this simple comment to be consistent will other convert rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

I'm ok with both, will change to `Calc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860657
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i 
understand correctly, `UNION` will do a global distinct for all fields and 
`UNION ALL` just concat two datasets or datastreams. I think the behavior of 
`DataStream.union` is rather `UNION ALL` than `UNION`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

We need to rewrite `explainTerms` and `toString` to make the RelNode's 
digest distinguished from other `TableSourceScan`. The default implementation 
only compare the table name and it's not enough when we push filter or projects 
into the `TableSource`. 

Actually there is a bug in `toString` and fixed in 
https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a,
 i will backport it in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111859756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

yes, these can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111858923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

This is similar with `FlinkLogicalJoin`, we need other logical rules to 
rewrite distinct aggregates first and then convert it to a "clean" 
FlinkLogicalAggregate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

How about rename `org.apache.flink.table.plan.node` to 
`org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` 
and `datastream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Currently the answer is yes, because the physical join can only support 
equation condition with simple keys. 
If the join key is express like `a + 1 = b - 2`, if we don't have this 
restriction in logical layer, the join condition will be like this and we can't 
translate to physical join. There are 2 possible solutions:
1. keep it this way
2. Re-introduce rules which can exact expression out of join condition and 
add addition "Calc" node to keep join key as simple field. 
I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to 
let logical layer know all these restrictions. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111856357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Yes, i will first test how HepPlanner works in some test and product 
environments and then decide whether or how we change it to rule-based planner. 
So this will be my follow up task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

some of the other methods could be removed as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

other `FlinkLogical` nodes do not implement `explainTerms()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262207
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

the other `FlinkLogical` nodes do not implement `explainTerms()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262692
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Why did you change `UNION` to `UNION ALL`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109261556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Do we need these checks here? Whether the join can be translated depends 
also on stream and batch. So we could just create a join and let the physical 
optimization decide whether this can be translated or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109395373
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

Can we change this to `def getRowCount(rel: Calc, mq: RelMetadataQuery): 
Double` to cover `DataSetCalc` and `FlinkLogicalCalc` or would that be too 
generic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262780
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
 ---
@@ -85,8 +85,10 @@ class QueryDecorrelationTest extends TableTestBase {
   term("joinType", "InnerJoin")
 ),
 term("select", "empno0", "salary")
-  ),
-  term("groupBy", "empno0"),
+  )
+  ,
+  term("groupBy", "empno0")
--- End diff --

revert these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109261751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

Should we apply translation restrictions on the logical level? DataSet and 
DataStream might differ in their support for different types of operations. It 
might be better to let the physical optimization decides what's supported and 
what is not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109253071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Isn't the plan to use the rule-based `HepPlanner` instead of the 
`VolcanoPlanner` for logical optimization or should switching be a follow-up 
step?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109262077
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

move the `FlinkLogicalRels` to 
`org.apache.flink.table.plan.nodes.flinklogical`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r109257615
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

update comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---