gustavodemorais commented on code in PR #26313:
URL: https://github.com/apache/flink/pull/26313#discussion_r2079642412
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -0,0 +1,474 @@
+package org.apache.flink.table.runtime.operators.join.stream;
+
+// TODO Gustavo Confirm we should create a private custom enum for join types
instead of using
+// Calcite's JoinRelType
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.MultiJoinCondition;
+import
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
+import
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
+import
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Streaming multi-way join operator which supports inner join and
left/right/full outer join. It
+ * eliminates the intermediate state necessary for a chain of multiple binary
joins. In other words,
+ * it considerable reduces the total amount of state necessary for chained
joins. As of time
+ * complexity, it performs better in the worst cases where the number of
records in the intermediate
+ * state is large but worst than reorded binary joins when the number of
records in the intermediate
+ * state is small.
+ */
+public class StreamingMultiJoinOperator extends
AbstractStreamOperatorV2<RowData>
+ implements MultipleInputStreamOperator<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingMultiJoinOperator.class);
+ private static final long serialVersionUID = 1L;
+
+ private final List<JoinInputSideSpec> inputSpecs;
+ private final List<JoinRelType> joinTypes;
+ private final List<InternalTypeInfo<RowData>> inputTypes;
+ private final MultiJoinCondition multiJoinCondition;
+ private final boolean[] filterNulls;
+ private final long[] stateRetentionTime;
Review Comment:
Technically, every input can have a different state ttl, e.g.
```
SELECT /*+ STATE_TTL('posts'='6h', 'users'='2d') */ *
FROM posts
JOIN users ON posts.user_id = users.id;
```
When I get to the optimizer, I'll confirm it works for N inputs. But either
way, I think it's safer and correct for the operator to support a different
state ttl for each input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]