Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3649#discussion_r108983942
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
---
@@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(
* If timestamps arrive in order (as in case of using the RocksDB state
backend) this is just
* an append with O(1).
*/
- private def insertToSortedList(recordTimeStamp: Long) = {
+ private def insertToSortedList(recordTimestamp: Long) = {
val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
var continue = true
while (listIterator.hasPrevious && continue) {
val timestamp = listIterator.previous
- if (recordTimeStamp >= timestamp) {
+ if (recordTimestamp >= timestamp) {
listIterator.next
- listIterator.add(recordTimeStamp)
+ listIterator.add(recordTimestamp)
continue = false
}
}
if (continue) {
- sortedTimestamps.addFirst(recordTimeStamp)
+ sortedTimestamps.addFirst(recordTimestamp)
}
}
+ /**
+ * Process the same timestamp datas, the mechanism is different between
+ * rows and range window.
+ */
+ def processElementsWithSameTimestamp(
+ curRowList: JList[Row],
+ lastAccumulator: Row,
+ out: Collector[Row]): Unit
+
+}
+
+/**
+ * A ProcessFunction to support unbounded ROWS window.
+ * With the ROWS option you define on a physical level how many rows are
included in your window frame
--- End diff --
This line violates the 100 character limit of the Scala code style.
Please run a local build before opening a PR to capture such problems (`mvn
clean install` inside of the `./flink-libraries/flink-table` folder is usually
sufficient and takes ~5 mins).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---