dawidwys commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1820335891
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java:
##########
@@ -51,7 +71,95 @@ public BatchExecMatch(
}
@Override
- public boolean isProcTime(RowType inputRowType) {
- return true;
+ public void checkOrderKeys(RowType inputRowType) {
+ SortSpec orderKeys = matchSpec.getOrderKeys();
+ if (orderKeys.getFieldSize() == 0) {
+ throw new TableException("You must specify non-empty order by.");
+ }
+
+ SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+ int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+ LogicalType timeOrderFieldType =
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+ if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+ throw new TableException("You must specify time point for order by
as the first one.");
+ }
+
+ // time ordering needs to be ascending
+ if (!orderKeys.getAscendingOrders()[0]) {
+ throw new TableException("Primary sort order of a table must be
ascending on time.");
+ }
+ }
+
+ @Override
+ protected Transformation<RowData> translateOrder(
+ PlannerBase planner,
+ Transformation<RowData> inputTransform,
+ RowType inputRowType,
+ ExecEdge inputEdge,
+ ExecNodeConfig config) {
+ if (isProcTime(inputRowType)) {
+ // In proctime process records in the order they come.
+ return inputTransform;
+ }
+
+ SortSpec sortSpec = matchSpec.getOrderKeys();
+ RowType inputType = (RowType) inputEdge.getOutputType();
+ SortCodeGenerator codeGen =
+ new SortCodeGenerator(
+ config, planner.getFlinkContext().getClassLoader(),
inputType, sortSpec);
+ SortOperator operator =
Review Comment:
> Correct me if I'm wrong: In batch mode events for given key are buffered
in CepOperator state and they all are processed when onEventTime(MAX_WATERMARK)
is called. onEventTime() is called exactly once for each key. In onEventTime()
CepOperator iterates MapState<Long, List<IN>> in ascending order by key (ties
are resolved using EventComparator<IN> comparator).
Yes, that's my understanding as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]