This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ec47f5af23 [multistage] [testing] Add join operator unit test (#9775)
ec47f5af23 is described below
commit ec47f5af234c22734590dcd21c990d04b62d129a
Author: Yao Liu <[email protected]>
AuthorDate: Thu Nov 17 19:35:48 2022 -0800
[multistage] [testing] Add join operator unit test (#9775)
---
.../query/runtime/operator/HashJoinOperator.java | 55 ++-
.../pinot/query/service/QueryDispatcher.java | 18 +-
.../runtime/operator/AggregateOperatorTest.java | 17 +-
.../runtime/operator/HashJoinOperatorTest.java | 549 +++++++++++++++++++--
4 files changed, 545 insertions(+), 94 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ce0f19177f..a15afee20e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.clearspring.analytics.util.Preconditions;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.datablock.DataBlock;
@@ -48,9 +51,10 @@ import
org.apache.pinot.query.runtime.operator.operands.FilterOperand;
* We currently support left join, inner join and semi join.
* The output is in the format of [left_row, right_row]
*/
+// TODO: Move inequi out of hashjoin.
(https://github.com/apache/pinot/issues/9728)
public class HashJoinOperator extends BaseOperator<TransferableBlock> {
- private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
-
+ private static final String EXPLAIN_NAME = "HASH_JOIN";
+ private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES =
ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT);
private final HashMap<Key, List<Object[]>> _broadcastHashTable;
private final Operator<TransferableBlock> _leftTableOperator;
private final Operator<TransferableBlock> _rightTableOperator;
@@ -63,13 +67,14 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private KeySelector<Object[], Object[]> _leftKeySelector;
private KeySelector<Object[], Object[]> _rightKeySelector;
- // TODO: Fix inequi join bug. (https://github.com/apache/pinot/issues/9728)
- // TODO: Double check semi join logic.
public HashJoinOperator(Operator<TransferableBlock> leftTableOperator,
Operator<TransferableBlock> rightTableOperator,
DataSchema outputSchema, JoinNode.JoinKeys joinKeys, List<RexExpression>
joinClauses, JoinRelType joinType) {
- // TODO: Handle the case where _leftKeySelector and _rightKeySelector
could be null.
+ Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(joinType),
+ "Join type: " + joinType + " is not supported!");
_leftKeySelector = joinKeys.getLeftJoinKeySelector();
_rightKeySelector = joinKeys.getRightJoinKeySelector();
+ Preconditions.checkState(_leftKeySelector != null, "LeftKeySelector for
join cannot be null");
+ Preconditions.checkState(_rightKeySelector != null, "RightKeySelector for
join cannot be null");
_leftTableOperator = leftTableOperator;
_rightTableOperator = rightTableOperator;
_resultSchema = outputSchema;
@@ -98,19 +103,17 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
- if (!_isHashTableBuilt) {
- // Build JOIN hash table
- buildBroadcastHashTable();
- }
-
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- } else if (!_isHashTableBuilt) {
- return TransferableBlockUtils.getNoOpTransferableBlock();
- }
-
- // JOIN each left block with the right block.
try {
+ if (!_isHashTableBuilt) {
+ // Build JOIN hash table
+ buildBroadcastHashTable();
+ }
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ } else if (!_isHashTableBuilt) {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
+ }
+ // JOIN each left block with the right block.
return buildJoinedDataBlock(_leftTableOperator.nextBlock());
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
@@ -145,30 +148,30 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
if (leftBlock.isErrorBlock()) {
_upstreamErrorBlock = leftBlock;
return _upstreamErrorBlock;
- } else if (TransferableBlockUtils.isEndOfStream(leftBlock) ||
TransferableBlockUtils.isNoOpBlock(leftBlock)) {
+ } else if (TransferableBlockUtils.isNoOpBlock(leftBlock) ||
TransferableBlockUtils.isEndOfStream(leftBlock)) {
return leftBlock;
}
-
List<Object[]> rows = new ArrayList<>();
- List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new
ArrayList<>() : leftBlock.getContainer();
for (Object[] leftRow : container) {
- List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
- new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
+ // NOTE: Empty key selector will always give same hash code.
+ List<Object[]> hashCollection =
+ _broadcastHashTable.getOrDefault(new
Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
// If it is a left join and right table is empty, we return left rows.
if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
rows.add(joinRow(leftRow, null));
} else {
// If it is other type of join.
for (Object[] rightRow : hashCollection) {
+ // TODO: Optimize this to avoid unnecessary object copy.
Object[] resultRow = joinRow(leftRow, rightRow);
- if (_joinClauseEvaluators.isEmpty() ||
_joinClauseEvaluators.stream().allMatch(
- evaluator -> evaluator.apply(resultRow))) {
+ if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream()
+ .allMatch(evaluator -> evaluator.apply(resultRow))) {
rows.add(resultRow);
}
}
}
}
-
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
@@ -178,7 +181,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
for (Object obj : leftRow) {
resultRow[idx++] = obj;
}
- if (_joinType != JoinRelType.SEMI && rightRow != null) {
+ if (rightRow != null) {
for (Object obj : rightRow) {
resultRow[idx++] = obj;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 2fe9900bb8..df30f1b617 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -45,16 +45,12 @@ import
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@code QueryDispatcher} dispatch a query to different workers.
*/
public class QueryDispatcher {
- private static final Logger LOGGER =
LoggerFactory.getLogger(QueryDispatcher.class);
-
private final Map<String, DispatchClient> _dispatchClientMap = new
ConcurrentHashMap<>();
public QueryDispatcher() {
@@ -68,8 +64,8 @@ public class QueryDispatcher {
// run reduce stage and return result.
MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(reduceStageId);
MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(mailboxService,
-
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
- requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
mailboxService.getHostname(),
+
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
requestId,
+ reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
mailboxService.getHostname(),
mailboxService.getMailboxPort());
List<DataBlock> resultDataBlocks =
reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
@@ -91,9 +87,8 @@ public class QueryDispatcher {
int servicePort = serverInstance.getQueryServicePort();
int mailboxPort = serverInstance.getQueryMailboxPort();
DispatchClient client = getOrCreateDispatchClient(host, servicePort);
- Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder()
-
.setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan,
stageId,
- serverInstance)))
+ Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
.putMetadata("REQUEST_ID", String.valueOf(requestId))
.putMetadata("SERVER_INSTANCE_HOST",
serverInstance.getHostname())
.putMetadata("SERVER_INSTANCE_PORT",
String.valueOf(mailboxPort)).build());
@@ -132,8 +127,8 @@ public class QueryDispatcher {
if (TransferableBlockUtils.isEndOfStream(transferableBlock) &&
transferableBlock.isErrorBlock()) {
// TODO: we only received bubble up error from the execution stage
tree.
// TODO: query dispatch should also send cancel signal to the rest of
the execution stage tree.
- throw new RuntimeException("Received error query execution result
block: "
- + transferableBlock.getDataBlock().getExceptions());
+ throw new RuntimeException(
+ "Received error query execution result block: " +
transferableBlock.getDataBlock().getExceptions());
}
if (transferableBlock.isNoOpBlock()) {
continue;
@@ -154,7 +149,6 @@ public class QueryDispatcher {
for (DataBlock dataBlock : queryResult) {
int numColumns = resultSchema.getColumnNames().length;
int numRows = dataBlock.getNumberOfRows();
- DataSchema.ColumnDataType[] resultColumnDataTypes =
resultSchema.getColumnDataTypes();
List<Object[]> rows = new ArrayList<>(dataBlock.getNumberOfRows());
if (numRows > 0) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 72f2c31f63..5434701974 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.sql.SqlKind;
-import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.Operator;
@@ -111,7 +110,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
Mockito.when(_input.nextBlock())
- .thenReturn(block(inSchema, new Object[]{1, 1}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
@@ -135,7 +134,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
Mockito.when(_input.nextBlock())
- .thenReturn(block(inSchema, new Object[]{2, 1}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
@@ -163,7 +162,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
Mockito.when(_input.nextBlock())
- .thenReturn(block(inSchema, new Object[]{2, 3}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
@@ -192,8 +191,8 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
Mockito.when(_input.nextBlock())
- .thenReturn(block(inSchema, new Object[]{1, 1}, new Object[]{1, 1}))
- .thenReturn(block(inSchema, new Object[]{1, 1}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}, new
Object[]{1, 1}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
AggregateOperator.Merger merger =
Mockito.mock(AggregateOperator.Merger.class);
@@ -260,7 +259,7 @@ public class AggregateOperatorTest {
Mockito.when(_input.nextBlock())
// TODO: it is necessary to produce two values here, the operator only
throws on second
// (see the comment in Aggregate operator)
- .thenReturn(block(inSchema, new Object[]{2, "foo"}, new Object[]{2,
"foo"}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"},
new Object[]{2, "foo"}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
@@ -275,10 +274,6 @@ public class AggregateOperatorTest {
"expected it to fail with class cast exception");
}
- private static TransferableBlock block(DataSchema schema, Object[]... rows) {
- return new TransferableBlock(Arrays.asList(rows), schema,
DataBlock.Type.ROW);
- }
-
private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(SqlKind.SUM, FieldSpec.DataType.INT,
"SUM", ImmutableList.of(arg));
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 0cf912c279..e88a2703bd 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -18,21 +18,51 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class HashJoinOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private Operator<TransferableBlock> _leftOperator;
+
+ @Mock
+ private Operator<TransferableBlock> _rightOperator;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx,
List<Integer> rightIdx) {
FieldSelectionKeySelector leftSelect = new
FieldSelectionKeySelector(leftIdx);
FieldSelectionKeySelector rightSelect = new
FieldSelectionKeySelector(rightIdx);
@@ -40,84 +70,513 @@ public class HashJoinOperatorTest {
}
@Test
- public void testHashJoinKeyCollisionInnerJoin() {
- BaseOperator<TransferableBlock> leftOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
- BaseOperator<TransferableBlock> rightOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+ public void shouldHandleHashJoinKeyCollisionInnerJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
List<RexExpression> joinClauses = new ArrayList<>();
- DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
- DataSchema.ColumnDataType.STRING
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.INNER);
+
+ TransferableBlock result = joinOnString.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = joinOnString.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows =
+ Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", 2,
"BB"}, new Object[]{2, "BB", 3, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+ Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+ }
+
+ @Test
+ public void shouldHandleInnerJoinOnInt() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join =
- new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
- joinClauses, JoinRelType.INNER);
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_co2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
+ TransferableBlock result = joinOnInt.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = joinOnInt.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{2, "BB", 2,
"Aa"}, new Object[]{2, "BB", 2, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+ }
+
+ @Test
+ public void shouldHandleJoinOnEmptySelector() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_co2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
JoinRelType.INNER);
+ TransferableBlock result = joinOnInt.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = joinOnInt.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows =
+ Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{1, "Aa", 2,
"BB"}, new Object[]{1, "Aa", 3, "BB"},
+ new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"},
new Object[]{2, "BB", 3, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+ Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+ Assert.assertEquals(resultRows.get(3), expectedRows.get(3));
+ Assert.assertEquals(resultRows.get(4), expectedRows.get(4));
+ Assert.assertEquals(resultRows.get(5), expectedRows.get(5));
+ }
+
+ @Test
+ public void shouldHandleLeftJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "CC"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.LEFT);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
}
List<Object[]> resultRows = result.getContainer();
- List<Object[]> expectedRows =
- Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2,
"BB"}, new Object[]{2, "BB", 3, "BB"},
- new Object[]{3, "BB", 2, "BB"}, new Object[]{3, "BB", 3, "BB"});
- Assert.assertEquals(expectedRows.size(), resultRows.size());
- Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
- Assert.assertEquals(expectedRows.get(1), resultRows.get(1));
- Assert.assertEquals(expectedRows.get(2), resultRows.get(2));
- Assert.assertEquals(expectedRows.get(3), resultRows.get(3));
- Assert.assertEquals(expectedRows.get(4), resultRows.get(4));
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2,
"Aa"}, new Object[]{2, "CC", null, null});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
}
@Test
- public void testInnerJoin() {
- BaseOperator<TransferableBlock> leftOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
- BaseOperator<TransferableBlock> rightOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
+ public void shouldPassLeftTableEOS() {
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+
Mockito.when(_leftOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
- DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
- DataSchema.ColumnDataType.STRING
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
+ Assert.assertTrue(result.isEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldHandleLeftJoinOneToN() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join =
- new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
- joinClauses, JoinRelType.INNER);
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+
Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(leftSchema,
new Object[]{1, "Aa"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.LEFT);
+
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 1,
"BB"}, new Object[]{1, "Aa", 1, "CC"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+ }
+
+ @Test
+ public void shouldPassRightTableEOS() {
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
Mockito.when(_rightOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
}
List<Object[]> resultRows = result.getContainer();
- Object[] expRow = new Object[]{1, "Aa", 2, "Aa"};
- List<Object[]> expectedRows = new ArrayList<>();
- expectedRows.add(expRow);
- Assert.assertEquals(expectedRows.size(), resultRows.size());
- Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
+ Assert.assertTrue(resultRows.isEmpty());
}
@Test
- public void testLeftJoin() {
- BaseOperator<TransferableBlock> leftOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
- BaseOperator<TransferableBlock> rightOperator =
OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
+ public void shouldHandleInequiJoinOnString() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ List<RexExpression> functionOperands = new ArrayList<>();
+ functionOperands.add(new RexExpression.InputRef(1));
+ functionOperands.add(new RexExpression.InputRef(3));
+ joinClauses.add(
+ new RexExpression.FunctionCall(SqlKind.NOT_EQUALS,
FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands));
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
JoinRelType.INNER);
+
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
+ Assert.assertTrue(result.isErrorBlock());
+ MetadataBlock errorBlock = (MetadataBlock) result.getDataBlock();
+
Assert.assertTrue(errorBlock.getExceptions().get(1000).matches(".*notEquals.*"));
+ }
+
+ @Test
+ public void shouldHandleInequiJoinOnInt() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"},
new Object[]{1, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ List<RexExpression> functionOperands = new ArrayList<>();
+ functionOperands.add(new RexExpression.InputRef(0));
+ functionOperands.add(new RexExpression.InputRef(2));
+ joinClauses.add(
+ new RexExpression.FunctionCall(SqlKind.NOT_EQUALS,
FieldSpec.DataType.STRING, "NOT_EQUALS", functionOperands));
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses,
JoinRelType.INNER);
+
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2,
"Aa"}, new Object[]{2, "BB", 1, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*RIGHT is not supported"
+ + ".*")
+ public void shouldThrowOnRightJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.RIGHT);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*SEMI is not "
+ + "supported.*")
+ public void shouldThrowOnSemiJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.SEMI);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*FULL is not supported.*")
+ public void shouldThrowOnFullJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
List<RexExpression> joinClauses = new ArrayList<>();
DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join =
- new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)),
- joinClauses, JoinRelType.LEFT);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.FULL);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*ANTI is not supported.*")
+ public void shouldThrowOnAntiJoin() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"},
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo",
"bar"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses,
JoinRelType.ANTI);
+ }
+
+ @Test
+ public void shouldPropagateRightTableError() {
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("testInnerJoinRightError")));
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
+
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
+ Assert.assertTrue(result.isErrorBlock());
+
Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE)
+ .matches("testInnerJoinRightError"));
+ }
+
+ @Test
+ public void shouldPropagateLeftTableError() {
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new
Object[]{1, "CC"}, new Object[]{3, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("testInnerJoinLeftError")));
+
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
}
+ Assert.assertTrue(result.isErrorBlock());
+ Assert.assertTrue(
+
result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).matches("testInnerJoinLeftError"));
+ }
+
+ @Test
+ public void shouldHandleNoOpBlock() {
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+
Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema,
new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+ .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "CC"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
Mockito.when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema,
new Object[]{1, "BB"}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+ .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ List<RexExpression> joinClauses = new ArrayList<>();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_co2", "string_col2"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.STRING
+ });
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, resultSchema,
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses,
JoinRelType.INNER);
+
+ TransferableBlock result = join.nextBlock(); // first no-op consumes first
right data block.
+ Assert.assertTrue(result.isNoOpBlock());
+ result = join.nextBlock(); // second no-op consumes no-op right block.
+ Assert.assertTrue(result.isNoOpBlock());
+ result = join.nextBlock(); // third no-op consumes another right data
block.
+ Assert.assertTrue(result.isNoOpBlock());
+ result = join.nextBlock(); // forth no-op consumes another right data
block.
+ Assert.assertTrue(result.isNoOpBlock());
+ result = join.nextBlock(); // build result using the first left block
List<Object[]> resultRows = result.getContainer();
- List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2,
"Aa"}, new Object[]{2, "BB", null, null},
- new Object[]{3, "BB", null, null});
- Assert.assertEquals(expectedRows.size(), resultRows.size());
- Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
- Assert.assertEquals(expectedRows.get(1), resultRows.get(1));
- Assert.assertEquals(expectedRows.get(2), resultRows.get(2));
+ List<Object[]> expectedRows = ImmutableList.of(new Object[]{2, "BB", 2,
"Aa"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ result = join.nextBlock(); // second left block is no-op
+ Assert.assertTrue(result.isNoOpBlock());
+ result = join.nextBlock(); // third left block consumes some extra data
+ expectedRows = ImmutableList.of(new Object[]{2, "CC", 2, "Aa"});
+ resultRows = result.getContainer();
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+ result = join.nextBlock(); // last one is EOS.
+ Assert.assertTrue(result.isEndOfStreamBlock());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]