yashmayya commented on code in PR #17576:
URL: https://github.com/apache/pinot/pull/17576#discussion_r2743166324
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+ Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+ Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the limit
is reached on a pipeline breaker")
+ .isEqualTo(true);
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_KEEP_PIPELINE_BREAKER_STATS,
"false");
+ try {
+ // lets try several times to give helix time to propagate the config
change
+ for (int i = 0; i < 10; i++) {
+ try {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ Assert.assertNull(mytableLeaf.get("children"), "When pipeline
breaker stats are not kept, "
+ + "there should be no children under the leaf node");
+ System.out.println("Successfully verified absence of pipeline
breaker stats on attempt " + (i + 1));
Review Comment:
Let's remove this?
##########
compatibility-verifier/compCheck.sh:
##########
@@ -527,6 +527,7 @@ if [ -f "${SERVER_CONF_2}" ]; then
stopService server2
startService server2 "$oldTargetDir" "$SERVER_CONF_2"
waitForServer2Ready
+ sleep 300
Review Comment:
What's this for?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
Review Comment:
So if IIUC, the structure would look like `LEAF <- MAILBOX_RECEIVE <-
MAILBOX_SEND <- LEAF`? And we don't specifically have a `PIPELINE_BREAKER` node
in the stats?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+ Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+ Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the limit
is reached on a pipeline breaker")
+ .isEqualTo(true);
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_KEEP_PIPELINE_BREAKER_STATS,
"false");
+ try {
+ // lets try several times to give helix time to propagate the config
change
+ for (int i = 0; i < 10; i++) {
Review Comment:
Can we use the `waitForCondition` pattern from `TestUtils` instead?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the
limit is reached on a pipeline breaker")
+ .isEqualTo(true);
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats()
+ throws Exception {
+ // lets try several times to give helix time to propagate the config change
+ for (int i = 0; i < 10; i++) {
+ try {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker
stats are not kept, "
+ + "there should be no children under the leaf node");
+ System.out.println("Successfully verified absence of pipeline breaker
stats on attempt " + (i + 1));
Review Comment:
Let's remove this?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the
limit is reached on a pipeline breaker")
+ .isEqualTo(true);
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats()
+ throws Exception {
+ // lets try several times to give helix time to propagate the config change
+ for (int i = 0; i < 10; i++) {
Review Comment:
Can we use the `waitForCondition` pattern from `TestUtils` instead?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the
limit is reached on a pipeline breaker")
+ .isEqualTo(true);
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats()
+ throws Exception {
+ // lets try several times to give helix time to propagate the config change
+ for (int i = 0; i < 10; i++) {
+ try {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker
stats are not kept, "
Review Comment:
So if IIUC, the structure would look like `LEAF` <- `MAILBOX_RECEIVE` <-
`MAILBOX_SEND` <- `LEAF`? And we don't specifically have a `PIPELINE_BREAKER`
node in the stats?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]