http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
new file mode 100644
index 0000000..525d4d7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param intermediateType         the intermediate row tye which the state 
saved
+  * @param inputType                the input row tye which the state saved
+  */
+abstract class RowTimeUnboundedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    intermediateType: TypeInformation[Row],
+    inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+
+  protected var output: Row = _
+  // state to hold the accumulators of the aggregations
+  private var accumulatorState: ValueState[Row] = _
+  // state to hold rows until the next watermark arrives
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  // list to sort timestamps to access rows in timestamp order
+  private var sortedTimestamps: util.LinkedList[Long] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  protected var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+    sortedTimestamps = new util.LinkedList[Long]()
+
+    // initialize accumulator state
+    val accDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("accumulatorstate", intermediateType)
+    accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
+
+    // initialize row state
+    val rowListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputType)
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+  }
+
+  /**
+    * Puts an element from the input stream into state if it is not late.
+    * Registers a timer for the next watermark.
+    *
+    * @param input The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement(
+     input: Row,
+     ctx:  ProcessFunction[Row, Row]#Context,
+     out: Collector[Row]): Unit = {
+
+    val timestamp = ctx.timestamp()
+    val curWatermark = ctx.timerService().currentWatermark()
+
+    // discard late record
+    if (timestamp >= curWatermark) {
+      // ensure every key just registers one timer
+      ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+
+      // put row into state
+      var rowList = rowMapState.get(timestamp)
+      if (rowList == null) {
+        rowList = new util.ArrayList[Row]()
+      }
+      rowList.add(input)
+      rowMapState.put(timestamp, rowList)
+    }
+  }
+
+  /**
+    * Called when a watermark arrived.
+    * Sorts records according the timestamp, computes aggregates, and emits 
all records with
+    * timestamp smaller than the watermark in timestamp order.
+    *
+    * @param timestamp The timestamp of the firing timer.
+    * @param ctx       The ctx to register timer or get current time
+    * @param out       The collector for returning result values.
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: ProcessFunction[Row, Row]#OnTimerContext,
+      out: Collector[Row]): Unit = {
+
+    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
+    val collector = out.asInstanceOf[TimestampedCollector[Row]]
+
+    val keyIterator = rowMapState.keys.iterator
+    if (keyIterator.hasNext) {
+      val curWatermark = ctx.timerService.currentWatermark
+      var existEarlyRecord: Boolean = false
+
+      // sort the record timestamps
+      do {
+        val recordTime = keyIterator.next
+        // only take timestamps smaller/equal to the watermark
+        if (recordTime <= curWatermark) {
+          insertToSortedList(recordTime)
+        } else {
+          existEarlyRecord = true
+        }
+      } while (keyIterator.hasNext)
+
+      // get last accumulator
+      var lastAccumulator = accumulatorState.value
+      if (lastAccumulator == null) {
+        // initialize accumulator
+        lastAccumulator = function.createAccumulators()
+      }
+
+      // emit the rows in order
+      while (!sortedTimestamps.isEmpty) {
+        val curTimestamp = sortedTimestamps.removeFirst()
+        val curRowList = rowMapState.get(curTimestamp)
+        collector.setAbsoluteTimestamp(curTimestamp)
+
+        // process the same timestamp datas, the mechanism is different 
according ROWS or RANGE
+        processElementsWithSameTimestamp(curRowList, lastAccumulator, 
collector)
+
+        rowMapState.remove(curTimestamp)
+      }
+
+      accumulatorState.update(lastAccumulator)
+
+      // if are are rows with timestamp > watermark, register a timer for the 
next watermark
+      if (existEarlyRecord) {
+        ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+      }
+    }
+  }
+
+  /**
+   * Inserts timestamps in order into a linked list.
+   *
+   * If timestamps arrive in order (as in case of using the RocksDB state 
backend) this is just
+   * an append with O(1).
+   */
+  private def insertToSortedList(recordTimestamp: Long) = {
+    val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
+    var continue = true
+    while (listIterator.hasPrevious && continue) {
+      val timestamp = listIterator.previous
+      if (recordTimestamp >= timestamp) {
+        listIterator.next
+        listIterator.add(recordTimestamp)
+        continue = false
+      }
+    }
+
+    if (continue) {
+      sortedTimestamps.addFirst(recordTimestamp)
+    }
+  }
+
+  /**
+   * Process the same timestamp datas, the mechanism is different between
+   * rows and range window.
+   */
+  def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit
+
+}
+
+/**
+  * A ProcessFunction to support unbounded ROWS window.
+  * The ROWS clause defines on a physical level how many rows are included in 
a window frame.
+  */
+class RowTimeUnboundedRowsOver(
+    genAggregations: GeneratedAggregationsFunction,
+    intermediateType: TypeInformation[Row],
+    inputType: TypeInformation[Row])
+  extends RowTimeUnboundedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    intermediateType,
+    inputType) {
+
+  override def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit = {
+
+    var i = 0
+    while (i < curRowList.size) {
+      val curRow = curRowList.get(i)
+
+      var j = 0
+      // copy forwarded fields to output row
+      function.setForwardedFields(curRow, output)
+
+      // update accumulators and copy aggregates to output row
+      function.accumulate(lastAccumulator, curRow)
+      function.setAggregationResults(lastAccumulator, output)
+      // emit output row
+      out.collect(output)
+      i += 1
+    }
+  }
+}
+
+
+/**
+  * A ProcessFunction to support unbounded RANGE window.
+  * The RANGE option includes all the rows within the window frame
+  * that have the same ORDER BY values as the current row.
+  */
+class RowTimeUnboundedRangeOver(
+    genAggregations: GeneratedAggregationsFunction,
+    intermediateType: TypeInformation[Row],
+    inputType: TypeInformation[Row])
+  extends RowTimeUnboundedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    intermediateType,
+    inputType) {
+
+  override def processElementsWithSameTimestamp(
+    curRowList: JList[Row],
+    lastAccumulator: Row,
+    out: Collector[Row]): Unit = {
+
+    var i = 0
+    // all same timestamp data should have same aggregation value.
+    while (i < curRowList.size) {
+      val curRow = curRowList.get(i)
+
+      function.accumulate(lastAccumulator, curRow)
+      i += 1
+    }
+
+    // emit output row
+    i = 0
+    while (i < curRowList.size) {
+      val curRow = curRowList.get(i)
+
+      // copy forwarded fields to output row
+      function.setForwardedFields(curRow, output)
+
+      //copy aggregates to output row
+      function.setAggregationResults(lastAccumulator, output)
+      out.collect(output)
+      i += 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
deleted file mode 100644
index 4539164..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for ROWS clause event-time bounded OVER window
- *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  * @param inputRowType             row type info of input row
-  * @param precedingOffset          preceding offset
- */
-class RowsClauseBoundedOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
-    precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  Preconditions.checkNotNull(aggregationStateType)
-  Preconditions.checkNotNull(precedingOffset)
-
-  private var output: Row = _
-
-  // the state which keeps the last triggering timestamp
-  private var lastTriggeringTsState: ValueState[Long] = _
-
-  // the state which keeps the count of data
-  private var dataCountState: ValueState[Long] = _
-
-  // the state which used to materialize the accumulator for incremental 
calculation
-  private var accumulatorState: ValueState[Row] = _
-
-  // the state which keeps all the data that are not expired.
-  // The first element (as the mapState key) of the tuple is the time stamp. 
Per each time stamp,
-  // the second element of tuple is a list that contains the entire data of 
all the rows belonging
-  // to this time stamp.
-  private var dataState: MapState[Long, JList[Row]] = _
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-
-    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
-      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
-    lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
-
-    val dataCountStateDescriptor =
-      new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
-    dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
-
-    val accumulatorStateDescriptor =
-      new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
-    accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
-
-    val keyTypeInformation: TypeInformation[Long] =
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
-
-    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]](
-        "dataState",
-        keyTypeInformation,
-        valueTypeInformation)
-
-    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    // triggering timestamp for trigger calculation
-    val triggeringTs = ctx.timestamp
-
-    val lastTriggeringTs = lastTriggeringTsState.value
-    // check if the data is expired, if not, save the data and register event 
time timer
-
-    if (triggeringTs > lastTriggeringTs) {
-      val data = dataState.get(triggeringTs)
-      if (null != data) {
-        data.add(input)
-        dataState.put(triggeringTs, data)
-      } else {
-        val data = new util.ArrayList[Row]
-        data.add(input)
-        dataState.put(triggeringTs, data)
-        // register event time timer
-        ctx.timerService.registerEventTimeTimer(triggeringTs)
-      }
-    }
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
-
-    // gets all window data from state for the calculation
-    val inputs: JList[Row] = dataState.get(timestamp)
-
-    if (null != inputs) {
-
-      var accumulators = accumulatorState.value
-      var dataCount = dataCountState.value
-
-      var retractList: JList[Row] = null
-      var retractTs: Long = Long.MaxValue
-      var retractCnt: Int = 0
-      var i = 0
-
-      while (i < inputs.size) {
-        val input = inputs.get(i)
-
-        // initialize when first run or failover recovery per key
-        if (null == accumulators) {
-          accumulators = function.createAccumulators()
-        }
-
-        var retractRow: Row = null
-
-        if (dataCount >= precedingOffset) {
-          if (null == retractList) {
-            // find the smallest timestamp
-            retractTs = Long.MaxValue
-            val dataTimestampIt = dataState.keys.iterator
-            while (dataTimestampIt.hasNext) {
-              val dataTs = dataTimestampIt.next
-              if (dataTs < retractTs) {
-                retractTs = dataTs
-              }
-            }
-            // get the oldest rows to retract them
-            retractList = dataState.get(retractTs)
-          }
-
-          retractRow = retractList.get(retractCnt)
-          retractCnt += 1
-
-          // remove retracted values from state
-          if (retractList.size == retractCnt) {
-            dataState.remove(retractTs)
-            retractList = null
-            retractCnt = 0
-          }
-        } else {
-          dataCount += 1
-        }
-
-        // copy forwarded fields to output row
-        function.setForwardedFields(input, output)
-
-        // retract old row from accumulators
-        if (null != retractRow) {
-          function.retract(accumulators, retractRow)
-        }
-
-        // accumulate current row and set aggregate in output row
-        function.accumulate(accumulators, input)
-        function.setAggregationResults(accumulators, output)
-        i += 1
-
-        out.collect(output)
-      }
-
-      // update all states
-      if (dataState.contains(retractTs)) {
-        if (retractCnt > 0) {
-          retractList.subList(0, retractCnt).clear()
-          dataState.put(retractTs, retractList)
-        }
-      }
-      dataCountState.update(dataCount)
-      accumulatorState.update(accumulators)
-    }
-
-    lastTriggeringTsState.update(timestamp)
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
deleted file mode 100644
index cca3e3f..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.java.typeutils.ListTypeInfo
-import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-
-/**
-  * A ProcessFunction to support unbounded event-time over-window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param intermediateType         the intermediate row tye which the state 
saved
-  * @param inputType                the input row tye which the state saved
-  */
-abstract class UnboundedEventTimeOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  protected var output: Row = _
-  // state to hold the accumulators of the aggregations
-  private var accumulatorState: ValueState[Row] = _
-  // state to hold rows until the next watermark arrives
-  private var rowMapState: MapState[Long, JList[Row]] = _
-  // list to sort timestamps to access rows in timestamp order
-  private var sortedTimestamps: util.LinkedList[Long] = _
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  protected var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-    sortedTimestamps = new util.LinkedList[Long]()
-
-    // initialize accumulator state
-    val accDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("accumulatorstate", intermediateType)
-    accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
-
-    // initialize row state
-    val rowListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputType)
-    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
-        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
-    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
-  }
-
-  /**
-    * Puts an element from the input stream into state if it is not late.
-    * Registers a timer for the next watermark.
-    *
-    * @param input The input value.
-    * @param ctx   The ctx to register timer or get current time
-    * @param out   The collector for returning result values.
-    *
-    */
-  override def processElement(
-     input: Row,
-     ctx:  ProcessFunction[Row, Row]#Context,
-     out: Collector[Row]): Unit = {
-
-    val timestamp = ctx.timestamp()
-    val curWatermark = ctx.timerService().currentWatermark()
-
-    // discard late record
-    if (timestamp >= curWatermark) {
-      // ensure every key just registers one timer
-      ctx.timerService.registerEventTimeTimer(curWatermark + 1)
-
-      // put row into state
-      var rowList = rowMapState.get(timestamp)
-      if (rowList == null) {
-        rowList = new util.ArrayList[Row]()
-      }
-      rowList.add(input)
-      rowMapState.put(timestamp, rowList)
-    }
-  }
-
-  /**
-    * Called when a watermark arrived.
-    * Sorts records according the timestamp, computes aggregates, and emits 
all records with
-    * timestamp smaller than the watermark in timestamp order.
-    *
-    * @param timestamp The timestamp of the firing timer.
-    * @param ctx       The ctx to register timer or get current time
-    * @param out       The collector for returning result values.
-    */
-  override def onTimer(
-      timestamp: Long,
-      ctx: ProcessFunction[Row, Row]#OnTimerContext,
-      out: Collector[Row]): Unit = {
-
-    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
-    val collector = out.asInstanceOf[TimestampedCollector[Row]]
-
-    val keyIterator = rowMapState.keys.iterator
-    if (keyIterator.hasNext) {
-      val curWatermark = ctx.timerService.currentWatermark
-      var existEarlyRecord: Boolean = false
-
-      // sort the record timestamps
-      do {
-        val recordTime = keyIterator.next
-        // only take timestamps smaller/equal to the watermark
-        if (recordTime <= curWatermark) {
-          insertToSortedList(recordTime)
-        } else {
-          existEarlyRecord = true
-        }
-      } while (keyIterator.hasNext)
-
-      // get last accumulator
-      var lastAccumulator = accumulatorState.value
-      if (lastAccumulator == null) {
-        // initialize accumulator
-        lastAccumulator = function.createAccumulators()
-      }
-
-      // emit the rows in order
-      while (!sortedTimestamps.isEmpty) {
-        val curTimestamp = sortedTimestamps.removeFirst()
-        val curRowList = rowMapState.get(curTimestamp)
-        collector.setAbsoluteTimestamp(curTimestamp)
-
-        // process the same timestamp datas, the mechanism is different 
according ROWS or RANGE
-        processElementsWithSameTimestamp(curRowList, lastAccumulator, 
collector)
-
-        rowMapState.remove(curTimestamp)
-      }
-
-      accumulatorState.update(lastAccumulator)
-
-      // if are are rows with timestamp > watermark, register a timer for the 
next watermark
-      if (existEarlyRecord) {
-        ctx.timerService.registerEventTimeTimer(curWatermark + 1)
-      }
-    }
-  }
-
-  /**
-   * Inserts timestamps in order into a linked list.
-   *
-   * If timestamps arrive in order (as in case of using the RocksDB state 
backend) this is just
-   * an append with O(1).
-   */
-  private def insertToSortedList(recordTimestamp: Long) = {
-    val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
-    var continue = true
-    while (listIterator.hasPrevious && continue) {
-      val timestamp = listIterator.previous
-      if (recordTimestamp >= timestamp) {
-        listIterator.next
-        listIterator.add(recordTimestamp)
-        continue = false
-      }
-    }
-
-    if (continue) {
-      sortedTimestamps.addFirst(recordTimestamp)
-    }
-  }
-
-  /**
-   * Process the same timestamp datas, the mechanism is different between
-   * rows and range window.
-   */
-  def processElementsWithSameTimestamp(
-    curRowList: JList[Row],
-    lastAccumulator: Row,
-    out: Collector[Row]): Unit
-
-}
-
-/**
-  * A ProcessFunction to support unbounded ROWS window.
-  * The ROWS clause defines on a physical level how many rows are included in 
a window frame.
-  */
-class UnboundedEventTimeRowsOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
-  extends UnboundedEventTimeOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    intermediateType,
-    inputType) {
-
-  override def processElementsWithSameTimestamp(
-    curRowList: JList[Row],
-    lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
-
-    var i = 0
-    while (i < curRowList.size) {
-      val curRow = curRowList.get(i)
-
-      var j = 0
-      // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
-
-      // update accumulators and copy aggregates to output row
-      function.accumulate(lastAccumulator, curRow)
-      function.setAggregationResults(lastAccumulator, output)
-      // emit output row
-      out.collect(output)
-      i += 1
-    }
-  }
-}
-
-
-/**
-  * A ProcessFunction to support unbounded RANGE window.
-  * The RANGE option includes all the rows within the window frame
-  * that have the same ORDER BY values as the current row.
-  */
-class UnboundedEventTimeRangeOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
-  extends UnboundedEventTimeOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    intermediateType,
-    inputType) {
-
-  override def processElementsWithSameTimestamp(
-    curRowList: JList[Row],
-    lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
-
-    var i = 0
-    // all same timestamp data should have same aggregation value.
-    while (i < curRowList.size) {
-      val curRow = curRowList.get(i)
-
-      function.accumulate(lastAccumulator, curRow)
-      i += 1
-    }
-
-    // emit output row
-    i = 0
-    while (i < curRowList.size) {
-      val curRow = curRowList.get(i)
-
-      // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
-
-      //copy aggregates to output row
-      function.setAggregationResults(lastAccumulator, output)
-      out.collect(output)
-      i += 1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
deleted file mode 100644
index 1a8399b..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for non-partitioned processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class UnboundedNonPartitionedProcessingOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
-    with CheckpointedFunction
-    with Compiler[GeneratedAggregations] {
-
-  private var accumulators: Row = _
-  private var output: Row = _
-  private var state: ListState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-    if (null == accumulators) {
-      val it = state.get().iterator()
-      if (it.hasNext) {
-        accumulators = it.next()
-      } else {
-        accumulators = function.createAccumulators()
-      }
-    }
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    function.setForwardedFields(input, output)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
-
-    out.collect(output)
-  }
-
-  override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    state.clear()
-    if (null != accumulators) {
-      state.add(accumulators)
-    }
-  }
-
-  override def initializeState(context: FunctionInitializationContext): Unit = 
{
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", 
aggregationStateType)
-    state = 
context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
deleted file mode 100644
index 9a6d4f0..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class UnboundedProcessingOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  private var output: Row = _
-  private var state: ValueState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-    val stateDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("overState", aggregationStateType)
-    state = getRuntimeContext.getState(stateDescriptor)
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    var accumulators = state.value()
-
-    if (null == accumulators) {
-      accumulators = function.createAccumulators()
-    }
-
-    function.setForwardedFields(input, output)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
-
-    state.update(accumulators)
-
-    out.collect(output)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
index 25ec36e..3610898 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -165,7 +165,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {
 
     val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
     val processFunction = new KeyedProcessOperator[String, Row, Row](
-      new BoundedProcessingOverRangeProcessFunction(
+      new ProcTimeBoundedRangeOver(
         genAggFunction,
         1000,
         aggregationStateType,

Reply via email to