[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1820 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-202816038 merging this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-200942437 I've rebased on master and will merge once travis turns green :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-200802410 Thanks for the update! +1 to merge :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-200784086 Thanks. I've addressed you comments! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-200424618 Proposed a few refactorings. Looks good otherwise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57190990 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamSource + +class DataStreamScanRule + extends ConverterRule( +classOf[LogicalTableScan], +Convention.NONE, +DataStreamConvention.INSTANCE, +"DataStreamScanRule") +{ + + def convert(rel: RelNode): RelNode = { +val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] +val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + --- End diff -- remove 1 new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57190736 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, BiRel} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with Union. + * + */ +class DataStreamUnion( +cluster: RelOptCluster, +traitSet: RelTraitSet, +left: RelNode, +right: RelNode, +rowType: RelDataType, +opName: String) --- End diff -- `DataSetUnion` was updated to not use the `opName` anymore. Would be good to reflect these changes also in `DataStreamUnion`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57190463 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.plan.nodes.FlinkCalc +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with FlatMapOperator. + * + */ +class DataStreamCalc( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +rowType: RelDataType, +calcProgram: RexProgram, +opName: String, --- End diff -- `opName` is no longer used and can be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57188635 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala --- @@ -20,15 +20,15 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.plan.nodes.FlinkRel import scala.collection.JavaConversions._ -trait DataSetRel extends RelNode { +trait DataSetRel extends RelNode with FlinkRel{ --- End diff -- space after `FlinkRel` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57188121 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.{GeneratedFunction, CodeGenerator} +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +trait FlinkCalc { + + def functionBody( --- End diff -- make as well `private[flink]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57187905 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.{GeneratedFunction, CodeGenerator} +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +trait FlinkCalc { + + def functionBody( +generator: CodeGenerator, +inputType: TypeInformation[Any], +rowType: RelDataType, +calcProgram: RexProgram, +config: TableConfig, +expectedType: Option[TypeInformation[Any]]): String = { + +val returnType = determineReturnType( + rowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val condition = calcProgram.getCondition +val expandedExpressions = calcProgram.getProjectList.map( + expr => calcProgram.expandLocalRef(expr)) +val projection = generator.generateResultExpression( + returnType, + rowType.getFieldNames, + expandedExpressions) + + // only projection + if (condition == null) { +s""" + |${projection.code} + |${generator.collectorTerm}.collect(${projection.resultTerm}); + |""".stripMargin + } + else { +val filterCondition = generator.generateExpression( + calcProgram.expandLocalRef(calcProgram.getCondition)) +// only filter +if (projection == null) { + // conversion + if (inputType != returnType) { +val conversion = generator.generateConverterResultExpression( + returnType, + rowType.getFieldNames) + +s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin + } + // no conversion + else { +s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${generator.collectorTerm}.collect(${generator.input1Term}); + |} + |""".stripMargin + } +} +// both filter and projection +else { + s""" +|${filterCondition.code} +|if (${filterCondition.resultTerm}) { +| ${projection.code} +| ${generator.collectorTerm}.collect(${projection.resultTerm}); +|} +|""".stripMargin +} + } +} + + private[flink] def calcMapFunction( + genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = { + +new FlatMapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + } + + private[flink] def conditionToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { + +val cond = calcProgram.getCondition +val inFields =
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-19747 I have updated the PR. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-199881323 Thanks for the review! I'll see how we can share the common parts between DataSet and DataStream translation. I will also open a JIRA for reworking the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-199877286 I think we should try to share more code between the DataSet and DataStream translation. Otherwise it looks good. We should also rework the tests and extract tests that check the Table API parts that are common for DataSet and DataStream. I wouldn't do that in this PR though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57010107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.streaming.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.JavaConversions._ +import org.junit.Test +import org.junit.Assert._ +import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase +import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData + +class FilterITCase extends StreamingMultipleProgramsTestBase { --- End diff -- Add a test that checks correct exception if predicate attribute does not exist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57008892 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.streaming.api.datastream.DataStream + +class DataStreamTable[T]( --- End diff -- Move code that is shared with `DataSetTable` into a common abstract `FlinkTable` class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57007432 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rex._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.streaming.api.datastream.DataStream +import scala.collection.JavaConversions._ + +trait DataStreamRel extends RelNode { + + /** +* Translates the FlinkRelNode into a Flink operator. +* +* @param config runtime configuration +* @param expectedType specifies the type the Flink operator should return. The type must +* have the same arity as the result. For instance, if the +* expected type is a RowTypeInfo this method will return a DataSet of +* type Row. If the expected type is Tuple2, the operator will return +* a Tuple2 if possible. Row otherwise. +* @return DataStream of type expectedType or RowTypeInfo +*/ + def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]] = None) +: DataStream[Any] + + private[flink] def getExpressionString( --- End diff -- Same as `DataSetRel.getExpressionString()`? Move to a `FlinkRel` trait? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1820#discussion_r57007059 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with FlatMapOperator. + * + */ +class DataStreamCalc( --- End diff -- This class shares a lot of code with `DataSetCalc`. Can we extract common parts such generation of the FlatMapFunction and the name of the operator into a common `FlinkCalc` trait or so? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia closed the pull request at: https://github.com/apache/flink/pull/1770 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1770#issuecomment-198384176 I'm closing this PR because it's made against `tableOnCalcite` and I'll open a new one against `master`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1820 [FLINK-3547] add support for streaming filter, project, and union in the Table API This PR adds: - Java and Scala stream translators - methods to convert a table to a DataStream and vice versa - streaming rules to FlinkRuleSets - DataStream rules for calc, scan, and union - tests and streaming test utils You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink streamTable-on-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1820.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1820 commit df2fd2dd28d88a3024e72e1a3ae58411435bf254 Author: vasiaDate: 2016-02-29T14:18:23Z [FLINK-3547] add support for DataStreamTable - add Java and Scala stream translators - add DataStream rules for calc and scan - add tests and streaming test utils - add support for streaming union remove comments commit ae4d5b62ca7e87ea1c25be15b1671d03bb6c5b71 Author: vasia Date: 2016-03-16T17:52:51Z [FLINK-3547] move code generation from the calc rule to the calc node and remove unnecessary rules --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1770#issuecomment-195302939 Rebased on current `tableOnCalcite`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1770 [FLINK-3547] add support for streaming filter, project, and union int he Table API This PR adds: - Java and Scala stream translators - methods to convert a table to a DataStream and vice versa - streaming rules to FlinkRuleSets - DataStream rules for calc, scan, and union - tests and streaming test utils You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink streamTable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1770 commit 34be3ea6879348e0731e28f46ad017f5a833b96d Author: vasiaDate: 2016-02-29T14:18:23Z [FLINK-3547] add support for DataStreamTable - add FlinkStreamScanRule and FlinkStreamCalcRule - add DataStream rules for calc and scan - add tests and streaming test utils - add support for streaming union --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---