[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1820


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-29 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-202816038
  
merging this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-24 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-200942437
  
I've rebased on master and will merge once travis turns green :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-24 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-200802410
  
Thanks for the update! +1 to merge :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-24 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-200784086
  
Thanks. I've addressed you comments!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-200424618
  
Proposed a few refactorings. Looks good otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57190990
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalTableScan
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamSource
+
+class DataStreamScanRule
+  extends ConverterRule(
+classOf[LogicalTableScan],
+Convention.NONE,
+DataStreamConvention.INSTANCE,
+"DataStreamScanRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
--- End diff --

remove 1 new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57190736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with Union.
+  *
+  */
+class DataStreamUnion(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+opName: String)
--- End diff --

`DataSetUnion` was updated to not use the `opName` anymore. Would be good 
to reflect these changes also in `DataStreamUnion`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57190463
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.plan.nodes.FlinkCalc
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with FlatMapOperator.
+  *
+  */
+class DataStreamCalc(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+rowType: RelDataType,
+calcProgram: RexProgram,
+opName: String,
--- End diff --

`opName` is no longer used and can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57188635
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
 ---
@@ -20,15 +20,15 @@ package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.plan.nodes.FlinkRel
 
 import scala.collection.JavaConversions._
 
-trait DataSetRel extends RelNode {
+trait DataSetRel extends RelNode with FlinkRel{
--- End diff --

space after `FlinkRel`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57188121
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{GeneratedFunction, 
CodeGenerator}
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait FlinkCalc {
+
+  def functionBody(
--- End diff --

make as well `private[flink]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57187905
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{GeneratedFunction, 
CodeGenerator}
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait FlinkCalc {
+
+  def functionBody(
+generator: CodeGenerator,
+inputType: TypeInformation[Any],
+rowType: RelDataType,
+calcProgram: RexProgram,
+config: TableConfig,
+expectedType: Option[TypeInformation[Any]]): String = {
+
+val returnType = determineReturnType(
+  rowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+val condition = calcProgram.getCondition
+val expandedExpressions = calcProgram.getProjectList.map(
+  expr => calcProgram.expandLocalRef(expr))
+val projection = generator.generateResultExpression(
+  returnType,
+  rowType.getFieldNames,
+  expandedExpressions)
+
+  // only projection
+  if (condition == null) {
+s"""
+  |${projection.code}
+  |${generator.collectorTerm}.collect(${projection.resultTerm});
+  |""".stripMargin
+  }
+  else {
+val filterCondition = generator.generateExpression(
+  calcProgram.expandLocalRef(calcProgram.getCondition))
+// only filter
+if (projection == null) {
+  // conversion
+  if (inputType != returnType) {
+val conversion = generator.generateConverterResultExpression(
+  returnType,
+  rowType.getFieldNames)
+
+s"""
+  |${filterCondition.code}
+  |if (${filterCondition.resultTerm}) {
+  |  ${conversion.code}
+  |  
${generator.collectorTerm}.collect(${conversion.resultTerm});
+  |}
+  |""".stripMargin
+  }
+  // no conversion
+  else {
+s"""
+  |${filterCondition.code}
+  |if (${filterCondition.resultTerm}) {
+  |  
${generator.collectorTerm}.collect(${generator.input1Term});
+  |}
+  |""".stripMargin
+  }
+}
+// both filter and projection
+else {
+  s"""
+|${filterCondition.code}
+|if (${filterCondition.resultTerm}) {
+|  ${projection.code}
+|  
${generator.collectorTerm}.collect(${projection.resultTerm});
+|}
+|""".stripMargin
+}
+  }
+}
+
+  private[flink] def calcMapFunction(
+  genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): 
RichFlatMapFunction[Any, Any] = {
+
+new FlatMapRunner[Any, Any](
+  genFunction.name,
+  genFunction.code,
+  genFunction.returnType)
+  }
+
+  private[flink] def conditionToString(
+  calcProgram: RexProgram,
+  expression: (RexNode, List[String], Option[List[RexNode]]) => 
String): String = {
+
+val cond = calcProgram.getCondition
+val inFields = 

[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-19747
  
I have updated the PR. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-199881323
  
Thanks for the review! I'll see how we can share the common parts between 
DataSet and DataStream translation. I will also open a JIRA for reworking the 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-199877286
  
I think we should try to share more code between the DataSet and DataStream 
translation. Otherwise it looks good. 

We should also rework the tests and extract tests that check the Table API 
parts that are common for DataSet and DataStream. I wouldn't do that in this PR 
though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57010107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.streaming.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.JavaConversions._
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
+
+class FilterITCase extends StreamingMultipleProgramsTestBase {
--- End diff --

Add a test that checks correct exception if predicate attribute does not 
exist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57008892
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.AtomicType
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.streaming.api.datastream.DataStream
+
+class DataStreamTable[T](
--- End diff --

Move code that is shared with `DataSetTable` into a common abstract 
`FlinkTable` class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57007432
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.streaming.api.datastream.DataStream
+import scala.collection.JavaConversions._
+
+trait DataStreamRel extends RelNode {
+
+  /**
+* Translates the FlinkRelNode into a Flink operator.
+*
+* @param config runtime configuration
+* @param expectedType specifies the type the Flink operator should 
return. The type must
+* have the same arity as the result. For instance, 
if the
+* expected type is a RowTypeInfo this method will 
return a DataSet of
+* type Row. If the expected type is Tuple2, the 
operator will return
+* a Tuple2 if possible. Row otherwise.
+* @return DataStream of type expectedType or RowTypeInfo
+*/
+  def translateToPlan(
+  config: TableConfig,
+  expectedType: Option[TypeInformation[Any]] = None)
+: DataStream[Any]
+
+  private[flink] def getExpressionString(
--- End diff --

Same as `DataSetRel.getExpressionString()`? Move to a `FlinkRel` trait?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1820#discussion_r57007059
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with FlatMapOperator.
+  *
+  */
+class DataStreamCalc(
--- End diff --

This class shares a lot of code with `DataSetCalc`. Can we extract common 
parts such generation of the FlatMapFunction and the name of the operator into 
a common `FlinkCalc` trait or so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-19 Thread vasia
Github user vasia closed the pull request at:

https://github.com/apache/flink/pull/1770


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-19 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1770#issuecomment-198384176
  
I'm closing this PR because it's made against `tableOnCalcite` and I'll 
open a new one against `master`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-18 Thread vasia
GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1820

[FLINK-3547] add support for streaming filter, project, and union in the 
Table API

This PR adds:

- Java and Scala stream translators
- methods to convert a table to a DataStream and vice versa
- streaming rules to FlinkRuleSets
- DataStream rules for calc, scan, and union
- tests and streaming test utils

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink streamTable-on-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1820


commit df2fd2dd28d88a3024e72e1a3ae58411435bf254
Author: vasia 
Date:   2016-02-29T14:18:23Z

[FLINK-3547] add support for DataStreamTable

- add Java and Scala stream translators

- add DataStream rules for calc and scan

- add tests and streaming test utils

- add support for streaming union

remove comments

commit ae4d5b62ca7e87ea1c25be15b1671d03bb6c5b71
Author: vasia 
Date:   2016-03-16T17:52:51Z

[FLINK-3547] move code generation from the calc rule to the calc node
and remove unnecessary rules




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-11 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1770#issuecomment-195302939
  
Rebased on current `tableOnCalcite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-07 Thread vasia
GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1770

[FLINK-3547] add support for streaming filter, project, and union int he 
Table API

This PR adds:

- Java and Scala stream translators
- methods to convert a table to a DataStream and vice versa
- streaming rules to FlinkRuleSets
- DataStream rules for calc, scan, and union
- tests and streaming test utils

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink streamTable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1770


commit 34be3ea6879348e0731e28f46ad017f5a833b96d
Author: vasia 
Date:   2016-02-29T14:18:23Z

[FLINK-3547] add support for DataStreamTable

- add FlinkStreamScanRule and FlinkStreamCalcRule

- add DataStream rules for calc and scan

- add tests and streaming test utils

- add support for streaming union




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---