amaliujia commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r412488527
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
+ * periods with no input for at least the duration specified by gap
parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements
Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.indexOfKeyColumn = indexOfKeyColumn;
+ this.gap = gap;
+ list = new LinkedList<>();
+ initialized = false;
+ }
+
+ @Override public Object[] current() {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return list.pollFirst();
+ }
+
+ @Override public boolean moveNext() {
+ return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+ }
+
+ @Override public void reset() {
+ list.clear();
+ inputEnumerator.reset();
+ initialized = false;
+ }
+
+ @Override public void close() {
+ list.clear();
+ inputEnumerator.close();
+ initialized = false;
+ }
+
+ private void initialize() {
+ List<Object[]> elements = new ArrayList<>();
+ // initialize() will be called when inputEnumerator.moveNext() is true,
+ // thus firstly should take the current element.
+ elements.add(inputEnumerator.current());
+ // sessionization needs to see all data.
+ while (inputEnumerator.moveNext()) {
Review comment:
Watermark is a concept that is used to indicate the completeness of
data, and it is not necessary an implementation.
The naming of "WatermarkedColumn" really means two things:
1. This is the column that provide event timestamp, which has the "event
timestamp" semantic. (You can find "Event time vs. processing time" section in
https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ to know
more about this semantic)
2. Because it has "event timestamp" semantic, we reply on "watermark" to
know the data completeness and define late data.
And windowing implemented by these series of PRs are coupled with "event
timestamp" semantic, or we can just call it "event timestamp windowing"
semantic.
What does it mean for batch and streaming sources? Say "hey, this is the
watermarked column, which gives the event timestamp. Let's apply windowing on
this column. Wait a minute, how do I know the data is complete, thus I can
close a window?"
For streaming source: "Hey it is streaming case, and we have watermark! If
the watermark passes the end of window, you can mark the window is closed and
all future data falls into that window is late".
For batch source: "Hey it is batch case, all data is known and there is no
late data. Just close the window when all data are processed. And watermark
also applies because when you ask the watermark, it always tell you that you
can close the window". **In this case we can either implement watermark, which
explicitly tell all window can be closed, or we just close them because this is
batch. After all, it is just an implementation choice**,
I think it is really a high-level argument and depends on if you accept this
statement: batch is a special case of streaming. If you do accept this
statement, using "watermark" is not wrong. Batch case can use all streaming
terms (note that I am not saying using the same implementation). Also in the
future, after streaming sources are built, either we change code here, or not,
the behavior of what is build in this PR won't change cause it matches with
"event timestamp windowing" semantic. So it is **backward compatible**.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]