This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5626dc9e0d4 IGNITE-24754 SQL Calcite: Fix MergeJoin/NestedLoopJoin
buffers overflow - Fixes #11977.
5626dc9e0d4 is described below
commit 5626dc9e0d456c26dc1bd5d783f23259d248ddc5
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Jul 25 11:19:59 2025 +0500
IGNITE-24754 SQL Calcite: Fix MergeJoin/NestedLoopJoin buffers overflow -
Fixes #11977.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/rel/MergeJoinNode.java | 269 +++++++--------------
.../query/calcite/exec/rel/NestedLoopJoinNode.java | 174 ++++---------
.../calcite/exec/rel/JoinBuffersExecutionTest.java | 238 ++++++++++++++++++
.../calcite/integration/JoinIntegrationTest.java | 5 +-
.../tpch/{TpchTest.java => AbstractTpchTest.java} | 14 +-
.../tpch/{TpchTest.java => TpchScale001Test.java} | 39 +--
.../calcite/integration/tpch/TpchScale010Test.java | 30 +++
.../tpch/{TpchTest.java => TpchScale100Test.java} | 39 +--
.../ignite/testsuites/ExecutionTestSuite.java | 2 +
.../ignite/testsuites/IntegrationTestSuite.java | 8 +-
10 files changed, 437 insertions(+), 381 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 94648e323ef..1f3f97e40d7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -32,6 +32,9 @@ import org.jetbrains.annotations.NotNull;
/** */
public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
+ /** */
+ private static final int HALF_BUF_SIZE = IN_BUFFER_SIZE >> 1;
+
/** Special value to highlights that all row were received and we are not
waiting any more. */
protected static final int NOT_WAITING = -1;
@@ -50,12 +53,27 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** */
protected int waitingRight;
+ /** */
+ protected Row left;
+
+ /** */
+ protected Row right;
+
/** */
protected final Deque<Row> rightInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
/** */
protected final Deque<Row> leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+ /** Used to store similar rows of rights stream in many-to-many join mode.
*/
+ protected List<Row> rightMaterialization;
+
+ /** */
+ protected boolean drainMaterialization;
+
+ /** */
+ protected int rightIdx;
+
/** */
protected boolean inLoop;
@@ -66,6 +84,9 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
*/
protected final boolean distributed;
+ /** Flag indicating that join is in finishing stage (one of the inputs are
ended, no more rows will be produced). */
+ protected boolean finishing;
+
/**
* @param ctx Execution context.
* @param comp Join expression.
@@ -78,6 +99,9 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
handler = ctx.rowHandler();
}
+ /** */
+ protected abstract void join() throws Exception;
+
/** {@inheritDoc} */
@Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 2;
@@ -104,8 +128,15 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
waitingLeft = 0;
waitingRight = 0;
+ left = null;
+ right = null;
+
rightInBuf.clear();
leftInBuf.clear();
+
+ rightIdx = 0;
+ rightMaterialization = null;
+ drainMaterialization = false;
}
/** {@inheritDoc} */
@@ -157,7 +188,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
waitingLeft--;
- leftInBuf.add(row);
+ if (!finishing)
+ leftInBuf.add(row);
join();
}
@@ -171,7 +203,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
waitingRight--;
- rightInBuf.add(row);
+ if (!finishing)
+ rightInBuf.add(row);
join();
}
@@ -211,14 +244,44 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
}
/** */
- protected abstract void join() throws Exception;
+ protected boolean leftFinished() {
+ return waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty();
+ }
/** */
- protected void checkJoinFinished() throws Exception {
+ protected boolean rightFinished(boolean withMaterialization) {
+ return waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty()
+ && (!withMaterialization || rightMaterialization == null);
+ }
+
+ /** */
+ protected boolean checkJoinFinished() throws Exception {
+ if (!finishing) {
+ finishing = true;
+ leftInBuf.clear();
+ rightInBuf.clear();
+ rightMaterialization = null;
+ rightIdx = 0;
+ drainMaterialization = false;
+ }
+
if (!distributed || (waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING)) {
requested = 0;
downstream().end();
+
+ return true;
}
+
+ return false;
+ }
+
+ /** */
+ protected void tryToRequestInputs() throws Exception {
+ if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE)
+ leftSource().request(waitingLeft = IN_BUFFER_SIZE -
leftInBuf.size());
+
+ if (waitingRight == 0 && rightInBuf.size() <= HALF_BUF_SIZE)
+ rightSource().request(waitingRight = IN_BUFFER_SIZE -
rightInBuf.size());
}
/** */
@@ -260,21 +323,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** */
private static class InnerJoin<Row> extends MergeJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private Row right;
-
- /** Used to store similar rows of rights stream in many-to-many join
mode. */
- private List<Row> rightMaterialization;
-
- /** */
- private int rightIdx;
-
- /** */
- private boolean drainMaterialization;
-
/**
* @param ctx Execution context.
* @param rowType Row type.
@@ -285,17 +333,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
super(ctx, rowType, comp, distributed);
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
- rightIdx = 0;
- rightMaterialization = null;
- drainMaterialization = false;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -392,16 +429,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (requested > 0 && (leftFinished() || rightFinished(true)) &&
checkJoinFinished())
+ return;
- if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null
&& leftInBuf.isEmpty())
- || (waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty() && rightMaterialization == null))
- )
- checkJoinFinished();
+ tryToRequestInputs();
}
}
@@ -410,21 +441,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** Right row factory. */
private final RowHandler.RowFactory<Row> rightRowFactory;
- /** */
- private Row left;
-
- /** */
- private Row right;
-
- /** Used to store similar rows of rights stream in many-to-many join
mode. */
- private List<Row> rightMaterialization;
-
- /** */
- private int rightIdx;
-
- /** */
- private boolean drainMaterialization;
-
/** Whether current left row was matched (hence pushed to downstream)
or not. */
private boolean matched;
@@ -447,17 +463,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
this.rightRowFactory = rightRowFactory;
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
- rightIdx = 0;
- rightMaterialization = null;
- drainMaterialization = false;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -577,14 +582,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (requested > 0 && leftFinished() && checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty())
- checkJoinFinished();
+ tryToRequestInputs();
}
}
@@ -593,21 +594,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** Right row factory. */
private final RowHandler.RowFactory<Row> leftRowFactory;
- /** */
- private Row left;
-
- /** */
- private Row right;
-
- /** Used to store similar rows of rights stream in many-to-many join
mode. */
- private List<Row> rightMaterialization;
-
- /** */
- private int rightIdx;
-
- /** */
- private boolean drainMaterialization;
-
/** Whether current right row was matched (hence pushed to downstream)
or not. */
private boolean matched;
@@ -630,17 +616,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
this.leftRowFactory = leftRowFactory;
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
- rightIdx = 0;
- rightMaterialization = null;
- drainMaterialization = false;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -772,14 +747,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (requested > 0 && rightFinished(true) && checkJoinFinished())
+ return;
- if (requested > 0 && waitingRight == NOT_WAITING && right == null
&& rightInBuf.isEmpty() && rightMaterialization == null)
- checkJoinFinished();
+ tryToRequestInputs();
}
}
@@ -791,21 +762,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** Right row factory. */
private final RowHandler.RowFactory<Row> rightRowFactory;
- /** */
- private Row left;
-
- /** */
- private Row right;
-
- /** Used to store similar rows of rights stream in many-to-many join
mode. */
- private List<Row> rightMaterialization;
-
- /** */
- private int rightIdx;
-
- /** */
- private boolean drainMaterialization;
-
/** Whether current left row was matched (hence pushed to downstream)
or not. */
private boolean leftMatched;
@@ -834,17 +790,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
this.rightRowFactory = rightRowFactory;
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
- rightIdx = 0;
- rightMaterialization = null;
- drainMaterialization = false;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -1006,27 +951,15 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
+ if (requested > 0 && leftFinished() && rightFinished(true) &&
checkJoinFinished())
+ return;
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
-
- if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty()
- && waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty() && rightMaterialization == null
- )
- checkJoinFinished();
+ tryToRequestInputs();
}
}
/** */
private static class SemiJoin<Row> extends MergeJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private Row right;
-
/**
* @param ctx Execution context.
* @param rowType Row type.
@@ -1037,14 +970,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
super(ctx, rowType, comp, distributed);
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -1081,27 +1006,15 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (requested > 0 && (leftFinished() || rightFinished(false)) &&
checkJoinFinished())
+ return;
- if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null
&& leftInBuf.isEmpty()
- || (waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty())))
- )
- checkJoinFinished();
+ tryToRequestInputs();
}
}
/** */
private static class AntiJoin<Row> extends MergeJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private Row right;
-
/**
* @param ctx Execution context.
* @param rowType Row type.
@@ -1112,14 +1025,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
super(ctx, rowType, comp, distributed);
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- right = null;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
inLoop = true;
@@ -1159,14 +1064,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = false;
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0)
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (requested > 0 && leftFinished() && checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty())
- checkJoinFinished();
+ tryToRequestInputs();
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index 581b7366b1e..abca013eafe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -32,6 +32,9 @@ import org.jetbrains.annotations.NotNull;
/** */
public abstract class NestedLoopJoinNode<Row> extends MemoryTrackingNode<Row> {
+ /** */
+ private static final int HALF_BUF_SIZE = IN_BUFFER_SIZE >> 1;
+
/** Special value to highlights that all row were received and we are not
waiting any more. */
protected static final int NOT_WAITING = -1;
@@ -44,6 +47,12 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** */
protected int requested;
+ /** */
+ protected Row left;
+
+ /** */
+ protected int rightIdx;
+
/** */
protected int waitingLeft;
@@ -98,6 +107,9 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
rightMaterialized.clear();
leftInBuf.clear();
+
+ left = null;
+ rightIdx = 0;
}
/** {@inheritDoc} */
@@ -247,12 +259,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** */
private static class InnerJoin<Row> extends NestedLoopJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -261,14 +267,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
super(ctx, rowType, cond);
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- rightIdx = 0;
-
- super.rewindInternal();
- }
-
/** */
@Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
@@ -300,16 +298,10 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null && leftInBuf.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ tryToRequestInputs();
}
}
@@ -321,12 +313,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** Whether current left row was matched or not. */
private boolean matched;
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -345,8 +331,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** */
@Override protected void rewindInternal() {
matched = false;
- left = null;
- rightIdx = 0;
super.rewindInternal();
}
@@ -398,16 +382,10 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
+ if (checkJoinFinished())
+ return;
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
-
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null && leftInBuf.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ tryToRequestInputs();
}
}
@@ -422,12 +400,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** */
private int lastPushedInd;
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -445,10 +417,8 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** {@inheritDoc} */
@Override protected void rewindInternal() {
- left = null;
rightNotMatchedIndexes.clear();
lastPushedInd = 0;
- rightIdx = 0;
super.rewindInternal();
}
@@ -523,17 +493,10 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if ((rightNotMatchedIndexes == null ||
rightNotMatchedIndexes.isEmpty()) && checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null
- && leftInBuf.isEmpty() && rightNotMatchedIndexes.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ tryToRequestInputs();
}
}
@@ -554,12 +517,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** */
private int lastPushedInd;
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -579,11 +536,9 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
/** {@inheritDoc} */
@Override protected void rewindInternal() {
- left = null;
leftMatched = false;
rightNotMatchedIndexes.clear();
lastPushedInd = 0;
- rightIdx = 0;
super.rewindInternal();
}
@@ -673,28 +628,15 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if ((rightNotMatchedIndexes == null ||
rightNotMatchedIndexes.isEmpty()) && checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null
- && leftInBuf.isEmpty() && rightNotMatchedIndexes.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ tryToRequestInputs();
}
}
/** */
private static class SemiJoin<Row> extends NestedLoopJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -703,14 +645,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
super(ctx, rowType, cond);
}
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- left = null;
- rightIdx = 0;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
@@ -739,28 +673,15 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
-
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ if (checkJoinFinished())
+ return;
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null
- && leftInBuf.isEmpty()) {
- downstream().end();
- requested = 0;
- }
+ tryToRequestInputs();
}
}
/** */
private static class AntiJoin<Row> extends NestedLoopJoinNode<Row> {
- /** */
- private Row left;
-
- /** */
- private int rightIdx;
-
/**
* @param ctx Execution context.
* @param cond Join expression.
@@ -769,14 +690,6 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
super(ctx, rowType, cond);
}
- /** */
- @Override protected void rewindInternal() {
- left = null;
- rightIdx = 0;
-
- super.rewindInternal();
- }
-
/** {@inheritDoc} */
@Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
@@ -809,16 +722,33 @@ public abstract class NestedLoopJoinNode<Row> extends
MemoryTrackingNode<Row> {
}
}
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
+ if (checkJoinFinished())
+ return;
- if (waitingLeft == 0 && leftInBuf.isEmpty())
- leftSource().request(waitingLeft = IN_BUFFER_SIZE);
+ tryToRequestInputs();
+ }
+ }
- if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null && leftInBuf.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ /** */
+ protected boolean checkJoinFinished() throws Exception {
+ if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING && left == null && leftInBuf.isEmpty()) {
+ requested = 0;
+ rightMaterialized.clear();
+
+ downstream().end();
+
+ return true;
}
+
+ return false;
+ }
+
+ /** */
+ protected void tryToRequestInputs() throws Exception {
+ if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE)
+ leftSource().request(waitingLeft = IN_BUFFER_SIZE -
leftInBuf.size());
+
+ if (waitingRight == 0 && requested > 0)
+ rightSource().request(waitingRight = IN_BUFFER_SIZE);
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java
new file mode 100644
index 00000000000..f5b01c376fc
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class JoinBuffersExecutionTest extends AbstractExecutionTest {
+ /** Tests merge join with input bigger that the buffer size. */
+ @Test
+ public void testMergeJoinBuffers() throws Exception {
+ JoinFactory joinFactory = (ctx, outType, leftType, rightType,
joinType, cond) ->
+ MergeJoinNode.create(ctx, outType, leftType, rightType, joinType,
Comparator.comparingInt(r -> (Integer)r[0]), true);
+
+ Consumer<AbstractNode<?>> bufChecker = (node) -> {
+ assertTrue(((MergeJoinNode<?>)node).leftInBuf.size() <=
IN_BUFFER_SIZE);
+
+ assertTrue(((MergeJoinNode<?>)node).rightInBuf.size() <=
IN_BUFFER_SIZE);
+ };
+
+ doTestJoinBuffer(joinFactory, bufChecker);
+ }
+
+ /** Tests NL with input bigger that the buffer size. */
+ @Test
+ public void testNLJoinBuffers() throws Exception {
+ JoinFactory joinFactory = (ctx, outType, leftType, rightType,
joinType, cond) ->
+ NestedLoopJoinNode.create(ctx, outType, leftType, rightType,
joinType, (r1, r2) -> r1[0].equals(r2[0]));
+
+ Consumer<AbstractNode<?>> bufChecker = (node) ->
+ assertTrue(((NestedLoopJoinNode<?>)node).leftInBuf.size() <=
IN_BUFFER_SIZE);
+
+ doTestJoinBuffer(joinFactory, bufChecker);
+ }
+
+ /**
+ * @param joinFactory Creates certain join node.
+ * @param joinBufChecker Finally check node after successfull run.
+ */
+ private void doTestJoinBuffer(
+ JoinFactory joinFactory,
+ Consumer<AbstractNode<?>> joinBufChecker
+ ) throws Exception {
+ for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI,
ANTI)) {
+ if (log.isInfoEnabled())
+ log.info("Testing join of type '" + joinType + "'...");
+
+ int size = IN_BUFFER_SIZE * 2 + IN_BUFFER_SIZE / 2;
+ int intersect = Math.max(10, IN_BUFFER_SIZE / 10);
+
+ int leftTo = size + intersect;
+ int rightTo = size * 2;
+
+ ExecutionContext<Object[]> ctx =
executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+ Iterator<Object[]> leftIter = IntStream.range(0,
leftTo).boxed().map(i -> new Object[] {i}).iterator();
+ Iterator<Object[]> rightIter = IntStream.range(size,
rightTo).boxed().map(i -> new Object[] {i}).iterator();
+
+ RelDataType leftType =
TypeUtils.createRowType(ctx.getTypeFactory(), int.class);
+ ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, () ->
leftIter);
+
+ RelDataType rightType =
TypeUtils.createRowType(ctx.getTypeFactory(), int.class);
+ ScanNode<Object[]> rightNode = new ScanNode<>(ctx, rightType, ()
-> rightIter);
+
+ RelDataType outType =
TypeUtils.createRowType(ctx.getTypeFactory(), int.class, int.class);
+
+ AbstractNode<Object[]> join = joinFactory.create(ctx, outType,
leftType, rightType, joinType,
+ (r1, r2) -> r1[0].equals(r2[0]));
+
+ join.register(F.asList(leftNode, rightNode));
+
+ List<Object[]> res = new ArrayList<>();
+
+ AtomicBoolean finished = new AtomicBoolean();
+
+ join.onRegister(new Downstream<>() {
+ @Override public void push(Object[] objects) {
+ res.add(objects);
+ }
+
+ @Override public void end() {
+ finished.set(true);
+ }
+
+ @Override public void onError(Throwable e) {
+ // No-op.
+ }
+ });
+
+ join.request(1);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> !res.isEmpty(),
getTestTimeout()));
+
+ joinBufChecker.accept(join);
+
+ join.request(size * size);
+
+ assertTrue(GridTestUtils.waitForCondition(finished::get,
getTestTimeout()));
+
+ switch (joinType) {
+ case LEFT:
+ assertEquals(size + intersect, res.size());
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(i, res.get(i)[0]);
+ assertEquals(null, res.get(i)[1]);
+ }
+
+ for (int i = size; i < size + intersect; ++i) {
+ assertEquals(i, res.get(i)[0]);
+ assertEquals(i, res.get(i)[1]);
+ }
+ break;
+
+ case INNER:
+ assertEquals(intersect, res.size());
+
+ for (int i = size; i < size + intersect; ++i) {
+ assertEquals(i, res.get(i - size)[0]);
+ assertEquals(i, res.get(i - size)[1]);
+ }
+ break;
+
+ case RIGHT:
+ assertEquals(rightTo - size, res.size());
+
+ for (int i = size; i < size + intersect; ++i) {
+ assertEquals(i, res.get(i - size)[0]);
+ assertEquals(i, res.get(i - size)[1]);
+ }
+
+ for (int i = size + intersect; i < size << 1; ++i) {
+ assertEquals(null, res.get(i - size)[0]);
+ assertEquals(i, res.get(i - size)[1]);
+ }
+ break;
+
+ case FULL:
+ assertEquals(size * 2, res.size());
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(i, res.get(i)[0]);
+ assertEquals(null, res.get(i)[1]);
+ }
+
+ for (int i = size; i < size + intersect; ++i) {
+ assertEquals(i, res.get(i)[0]);
+ assertEquals(i, res.get(i)[1]);
+ }
+
+ for (int i = size + intersect; i < size << 1; ++i) {
+ assertEquals(null, res.get(i)[0]);
+ assertEquals(i, res.get(i)[1]);
+ }
+ break;
+
+ case SEMI:
+ assertEquals(intersect, res.size());
+
+ for (int i = 0; i < intersect; ++i) {
+ assertEquals(1, res.get(i).length);
+ assertEquals(size + i, res.get(i)[0]);
+ }
+ break;
+
+ case ANTI:
+ assertEquals(size, res.size());
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(1, res.get(i).length);
+ assertEquals(i, res.get(i)[0]);
+ }
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported join
type: " + join);
+ }
+
+ joinBufChecker.accept(join);
+ }
+ }
+
+ /** */
+ @FunctionalInterface
+ protected interface JoinFactory {
+ /** */
+ AbstractNode<Object[]> create(
+ ExecutionContext<Object[]> ctx,
+ RelDataType outType,
+ RelDataType leftType,
+ RelDataType rightType,
+ JoinRelType joinType,
+ BiPredicate<Object[], Object[]> cond
+ );
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java
index 603a7123b36..d62178c56ee 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java
@@ -39,9 +39,8 @@ public class JoinIntegrationTest extends
AbstractBasicIntegrationTransactionalTe
List<Object[]> params = new ArrayList<>();
for (SqlTransactionMode sqlTxMode : SqlTransactionMode.values()) {
- for (JoinType jt : JoinType.values()) {
- params.add(new Object[]{sqlTxMode, jt});
- }
+ for (JoinType jt : JoinType.values())
+ params.add(new Object[] {sqlTxMode, jt});
}
return params;
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/AbstractTpchTest.java
similarity index 83%
copy from
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
copy to
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/AbstractTpchTest.java
index df59a9b2387..19e91d4a85b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/AbstractTpchTest.java
@@ -26,15 +26,21 @@ import org.junit.runners.Parameterized;
/** */
@RunWith(Parameterized.class)
-public class TpchTest extends AbstractBasicIntegrationTest {
+public abstract class AbstractTpchTest extends AbstractBasicIntegrationTest {
+ /** */
+ protected static final Collection<Integer> USED_TESTS = F.asList(16, 17,
19, 20);
+
/** Query ID. */
@Parameterized.Parameter
public int qryId;
+ /** */
+ protected abstract double scale();
+
/** */
@Parameterized.Parameters(name = "queryId={0}")
- public static Collection<Object> params() {
- return F.asList(16, 19, 20);
+ public static Collection<Integer> params() {
+ return USED_TESTS;
}
/** {@inheritDoc} */
@@ -43,7 +49,7 @@ public class TpchTest extends AbstractBasicIntegrationTest {
TpchHelper.createTables(client);
- TpchHelper.fillTables(client, 0.1);
+ TpchHelper.fillTables(client, scale());
TpchHelper.collectSqlStatistics(client);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale001Test.java
similarity index 50%
copy from
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
copy to
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale001Test.java
index df59a9b2387..205cdea3339 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale001Test.java
@@ -17,47 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.integration.tpch;
-import java.util.Collection;
-import
org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest;
-import org.apache.ignite.internal.util.typedef.F;
-import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** */
@RunWith(Parameterized.class)
-public class TpchTest extends AbstractBasicIntegrationTest {
- /** Query ID. */
- @Parameterized.Parameter
- public int qryId;
-
- /** */
- @Parameterized.Parameters(name = "queryId={0}")
- public static Collection<Object> params() {
- return F.asList(16, 19, 20);
- }
-
+public class TpchScale001Test extends AbstractTpchTest {
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- TpchHelper.createTables(client);
-
- TpchHelper.fillTables(client, 0.1);
-
- TpchHelper.collectSqlStatistics(client);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean destroyCachesAfterTest() {
- return false;
- }
-
- /**
- * Test the TPC-H query can be planned and executed.
- */
- @Test
- public void test() {
- sql(TpchHelper.getQuery(qryId));
+ @Override protected double scale() {
+ return 0.01;
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale010Test.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale010Test.java
new file mode 100644
index 00000000000..b6d39f13f5d
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale010Test.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration.tpch;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class TpchScale010Test extends AbstractTpchTest {
+ /** */
+ @Override protected double scale() {
+ return 0.1;
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale100Test.java
similarity index 55%
rename from
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
rename to
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale100Test.java
index df59a9b2387..8578fb66c06 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/TpchScale100Test.java
@@ -18,46 +18,25 @@
package org.apache.ignite.internal.processors.query.calcite.integration.tpch;
import java.util.Collection;
-import
org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest;
-import org.apache.ignite.internal.util.typedef.F;
-import org.junit.Test;
+import java.util.LinkedHashSet;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** */
@RunWith(Parameterized.class)
-public class TpchTest extends AbstractBasicIntegrationTest {
- /** Query ID. */
- @Parameterized.Parameter
- public int qryId;
-
- /** */
+public class TpchScale100Test extends AbstractTpchTest {
+ /** TODO Revise after https://issues.apache.org/jira/browse/IGNITE-25129 */
@Parameterized.Parameters(name = "queryId={0}")
- public static Collection<Object> params() {
- return F.asList(16, 19, 20);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ public static Collection<Integer> params() {
+ Collection<Integer> res = new LinkedHashSet<>(USED_TESTS);
- TpchHelper.createTables(client);
+ res.remove(16);
- TpchHelper.fillTables(client, 0.1);
-
- TpchHelper.collectSqlStatistics(client);
+ return res;
}
/** {@inheritDoc} */
- @Override protected boolean destroyCachesAfterTest() {
- return false;
- }
-
- /**
- * Test the TPC-H query can be planned and executed.
- */
- @Test
- public void test() {
- sql(TpchHelper.getQuery(qryId));
+ @Override protected double scale() {
+ return 1.0;
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index a9392bf7abb..bd6f6eb2258 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregat
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectExecutionTest;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.JoinBuffersExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
@@ -45,6 +46,7 @@ import org.junit.runners.Suite;
ContinuousExecutionTest.class,
MergeJoinExecutionTest.class,
NestedLoopJoinExecutionTest.class,
+ JoinBuffersExecutionTest.class,
TableSpoolExecutionTest.class,
SortedIndexSpoolExecutionTest.class,
HashIndexSpoolExecutionTest.class,
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 973c2040011..e5264bc0b02 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -78,7 +78,9 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIn
import
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.ViewsIntegrationTest;
-import
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale001Test;
+import
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale010Test;
+import
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale100Test;
import
org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcCrossEngineTest;
import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
import
org.apache.ignite.internal.processors.query.calcite.rules.JoinCommuteRulesTest;
@@ -160,7 +162,9 @@ import org.junit.runners.Suite;
SqlPlanHistoryIntegrationTest.class,
QueryBlockingTaskExecutorIntegrationTest.class,
ScalarInIntegrationTest.class,
- TpchTest.class,
+ TpchScale001Test.class,
+ TpchScale010Test.class,
+ TpchScale100Test.class,
UnnestIntegrationTest.class,
CalcitePlanningDumpTest.class,
})