[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-07-06 Thread rtudoran
Github user rtudoran closed the pull request at:

https://github.com/apache/flink/pull/3714


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-25 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r113214399
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
--- End diff --

@fhueske  It seems that RowComparator is not extending the 
java.util.Comparator ...so we need to wrap it in order to use it directly with 
the sorting algorithms from the collection library


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your proje

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-25 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r113143239
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
--- End diff --

@fhueske I have moved to the same format. I also created equivalent methods 
to support the format. i currently put them in the SortUtil. From my point of 
view this static object can be used as an util support also by the datasetsort.
The question is if you want in this merge for me to modify the datasetsort 
to use this methods?..it might become more messy the commit as it touches more 
classes. However

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112623715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112623603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112623175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("in

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112623029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
--- End diff --

OK -i will re-format. For moving common things into a CommonSort - i would 
prefer to do this in a second pass when i implement the sorting on rowtime 
(without offset/fetch). My logic is that at that point i can evaluate if it 
actually makes sense to have the common class (there are enough methods). 
However, if you think it is the case - i can of course also move it right away 
(but this will complicate a 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112622475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("in

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-21 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112622144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet }
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan }
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  DataStreamConvention.INSTANCE,
+  "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+super.matches(call)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(calc.getInput(0), 
DataStreamConvention.INSTANCE)
+
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
--- End diff --

@fhueske true - my expectation was that this returns the same type. Should 
i just use 

getOriginal.getRowType?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112212687
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112200218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
--- End diff --

we should not copy the `LogicalSort` but its properties (collation, offset, 
and fetch)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112198072
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet }
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan }
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  DataStreamConvention.INSTANCE,
+  "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+super.matches(call)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
--- End diff --

`calc` -> `sort`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112207628
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112217315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112220799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112203345
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet }
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan }
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  DataStreamConvention.INSTANCE,
+  "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+super.matches(call)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(calc.getInput(0), 
DataStreamConvention.INSTANCE)
+
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+new DataStreamSort(
+  calc,
+  rel.getCluster,
+  traitSet,
+  convInput,
+  rel.getRowType,
+  inputRowType,
+  description + calc.getId())
--- End diff --

`description + sort.getId()` -> `description`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112225554
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.LinkedList
+import java.util.Comparator
+
+
+/**
+ * Process Function used for the aggregate in bounded proctime sort 
without offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param inputType It is used to mark the type of the incoming data
+ * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
+ */
+class ProcTimeBoundedSortProcessFunction(
+  private val fieldCount: Int,
+  private val inputType: TypeInformation[Row],
+  private val rowComparator:Comparator[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(rowComparator)
+
+  private var stateEventsBuffer: ListState[Row] = _
+
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState", 
inputType)
+stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTime = ctx.timerService.currentProcessingTime
+//buffer the event incoming event
+  
+//we accumulate the events as they arrive within the given proctime
+stateEventsBuffer.add(input)
+
+//deduplication of multiple registered timers is done automatically
+ctx.timerService.registerProcessingTimeTimer(currentTime + 1)  
+
+  }
+  
+  override def onTimer(
+timestamp: Long,
+ctx: ProcessFunction[Row, Row]#OnTimerContext,
+out: Collector[Row]): Unit = {
+
+var i = 0
+val sortList = new LinkedList[Row]
--- End diff --

Reuse an `ArrayList` which creates fewer objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112217063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
--- End diff --

We can use the `RowComparator` and don't need to implement own comparators:

```
def createRowComparator(
inputRowType: RelDataType,
fieldIdxs: Array[Int],
fieldComps: Array[TypeComparator[AnyRef]],
fieldOrders: Array[Boolean]): TypeComparator[Row] = {

  val rowComp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112226928
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 ---
@@ -280,6 +281,86 @@ class BoundedProcessingOverRangeProcessFunctionTest {
 testHarness.close()
 
   }
+  
+  
+  @Test
+  def testSortProcTimeHarnessPartitioned(): Unit = {
--- End diff --

Please move this into a dedicated class `ProcTimeSortProcessFunctionTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112207488
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112214518
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
--- End diff --

move the first parameter also into a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112198474
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet }
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan }
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  DataStreamConvention.INSTANCE,
+  "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+super.matches(call)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(calc.getInput(0), 
DataStreamConvention.INSTANCE)
+
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
--- End diff --

Why do we need to get the input type? A sort should not change the row 
type. Hence, input and output type should be the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112220586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112226659
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 ---
@@ -29,6 +29,27 @@ class WindowAggregateTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
   streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
 
+  
+  
+  @Test
+  def testSortProcessingTime() = {
--- End diff --

I would add these tests to a new class `SortTest`. Please add also tests 
that check that unsupported operations (desc time sort, time field not primary 
sort field, etc.) throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112227142
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.LinkedList
+import java.util.Comparator
+
+
+/**
+ * Process Function used for the aggregate in bounded proctime sort 
without offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param inputType It is used to mark the type of the incoming data
+ * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
+ */
+class ProcTimeBoundedSortProcessFunction(
--- End diff --

Why bounded? 
Rename to `ProcTimeSortProcessFunction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112203273
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
--- End diff --

`description + sort.getId()` -> `description`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112208301
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112200587
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
--- End diff --

Input and output type of a sort are always the same. `rowRelDataType` 
should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112205545
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112212356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.table.functions.Accumulator
+import java.util.{ List => JList, ArrayList }
+import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, 
TypeInformation }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import java.util.Comparator
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import 
java.lang.{Byte=>JByte,Integer=>JInt,Long=>JLong,Double=>JDouble,Short=>JShort,String=>JString,Float=>JFloat}
+import java.math.{BigDecimal=>JBigDecimal}
+import org.apache.flink.api.common.functions.MapFunction
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic 
interfaces
+ */
+
+object SortUtil {
+
+  /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
+   * elements based on proctime and potentially other fields
+   * @param calcSort Sort logical object
+   * @param inputType input row type
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createProcTimeSortFunction(
+calcSort: LogicalSort,
+inputType: RelDataType): ProcessFunction[Row, Row] = {
+
+val keySortFields = getSortFieldIndexList(calcSort)
+val keySortDirections = getSortFieldDirectionList(calcSort)
+
+val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+
+val orderings = createOrderingComparison(inputType, keySortFields, 
keySortDirections)
+
+//drop time from comparison
+val keyIndexesNoTime = keySortFields.slice(1, keySortFields.size)
+val orderingsNoTime = orderings.slice(1, keySortFields.size)
+
+val rowComparator = 
createRowSortComparator(keyIndexesNoTime,orderingsNoTime)
+
+new ProcTimeBoundedSortProcessFunction(
+  inputType.getFieldCount,
+  inputRowType,
+  rowComparator)
+
+  }
+
+   /**
+   * Function creates a row comparator for the sorting fields based on
+   * [java.util.Comparator] objects derived from 
[org.apache.flink.api.common.TypeInfo]
+   * @param keyIndex the indexes of the fields on which the sorting is 
done. 
+   * First is expected to be the time  
+   * @param orderings the [UntypedOrdering] objects 
+   * @return Array of ordering objects
+   */
+  def createRowSortComparator(keyIndex: Array[Int],
+orderings:Array[UntypedOrdering]): Comparator[Row] = {
+  
+new SortRowComparator(orderings,keyIndex)
+  }
+  
+   /**
+   * Function creates comparison objects with embeded type casting 
+   * @param inputType input row type
+   * @param keyIndex the indexes of the fields on which the sorting 

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112208929
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112209644
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.java.typeutils.RowTypeInfo
--- End diff --

Remove unused imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112204008
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
--- End diff --

Please use the same formatting as `DataSetSort`. 
I might make sense to refactor common methods into a `CommonSort` class 
similar as `CommonCalc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at in

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112204101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

same formatting as `DataSetSort`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112198880
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet }
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan }
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+extends ConverterRule(
+  classOf[LogicalSort],
+  Convention.NONE,
+  DataStreamConvention.INSTANCE,
+  "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+super.matches(call)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+val convInput: RelNode = RelOptRule.convert(calc.getInput(0), 
DataStreamConvention.INSTANCE)
+
+val inputRowType = 
convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+new DataStreamSort(
+  calc,
--- End diff --

The other `DataStreamRel` nodes just copy the fields of the logical rel and 
not the `LogicalRel` itself. We should do the same here.
So we should copy the RelCollation, offset, and fetch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112202924
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
--- End diff --

Please remove unused imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112224889
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedSortProcessFunction.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.LinkedList
+import java.util.Comparator
+
+
+/**
+ * Process Function used for the aggregate in bounded proctime sort 
without offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param fieldCount Is used to indicate fields in the current element to 
forward
+ * @param inputType It is used to mark the type of the incoming data
+ * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
+ */
+class ProcTimeBoundedSortProcessFunction(
+  private val fieldCount: Int,
+  private val inputType: TypeInformation[Row],
+  private val rowComparator:Comparator[Row])
--- End diff --

+space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3714#discussion_r112206935
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.api.TableException
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.ProcTimeType
+import org.apache.flink.table.functions.RowTimeType
+import org.apache.calcite.rel.core.Sort
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil._
+
+/**
+  * Flink RelNode which matches along with Sort Rule.
+  *
+  */
+class DataStreamSort(
+  sort: LogicalSort,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputNode: RelNode,
+  rowRelDataType: RelDataType,
+  inputType: RelDataType,
+  description: String)
+extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamSort(
+  sort,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  inputType,
+  description + sort.getId())
+  }
+
+  override def toString: String = {
+s"Sort($sort)" +
+  " on fields: (${sort.collation.getFieldCollations})"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("aggregate", sort)
+  .item("sort fields",sort.collation.getFieldCollations)
+  .itemIf("offset", sort.offset, sort.offset!=null)
+  .itemIf("fetch", sort.fetch, sort.fetch!=null)
+  .item("inp

[GitHub] flink pull request #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream ...

2017-04-12 Thread rtudoran
GitHub user rtudoran opened a pull request:

https://github.com/apache/flink/pull/3714

[FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

Implement the sort based on process function

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-6075Re2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3714.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3714


commit 4e792eac7ff24992921f1750cd757ebf83dc97e2
Author: rtudoran 
Date:   2017-04-12T15:49:04Z

Add sort backbone support
Implement the sort based on process function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---