This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a29ecd7c27 Return improved dataschema for empty results when all
segments are pruned by broker (#13831)
a29ecd7c27 is described below
commit a29ecd7c27020a3ce598cfe3bc56f9c3016adb3a
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Fri Jan 24 14:01:37 2025 -0800
Return improved dataschema for empty results when all segments are pruned
by broker (#13831)
* Return datatypes when broker prunes all segments
* Integration test failure
---
.../BaseSingleStageBrokerRequestHandler.java | 9 ++--
.../tests/EmptyResponseIntegrationTest.java | 51 ++++++++++++++++++++++
...mentGenerationMinionClusterIntegrationTest.java | 2 +-
3 files changed, 58 insertions(+), 4 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 65a2d0a79e..b8c04140dc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -601,7 +601,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
- return getEmptyBrokerOnlyResponse(pinotQuery, requestContext,
tableName, requesterIdentity);
+ return getEmptyBrokerOnlyResponse(pinotQuery, requestContext,
tableName, requesterIdentity, schema, query,
+ database);
}
if (offlineBrokerRequest != null &&
isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) {
@@ -710,7 +711,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
return new BrokerResponseNative(exceptions);
} else {
// When all segments have been pruned, we can just return an empty
response.
- return getEmptyBrokerOnlyResponse(pinotQuery, requestContext,
tableName, requesterIdentity);
+ return getEmptyBrokerOnlyResponse(pinotQuery, requestContext,
tableName, requesterIdentity, schema, query,
+ database);
}
}
long routingEndTimeNs = System.nanoTime();
@@ -895,7 +897,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery
pinotQuery, RequestContext requestContext,
- String tableName, @Nullable RequesterIdentity requesterIdentity) {
+ String tableName, @Nullable RequesterIdentity requesterIdentity, Schema
schema, String query, String database) {
if (pinotQuery.isExplain()) {
// EXPLAIN PLAN results to show that query is evaluated exclusively by
Broker.
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
@@ -903,6 +905,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
// Send empty response since we don't need to evaluate either offline or
realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
+ ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema,
database, query);
brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableName,
brokerResponse, requesterIdentity, null));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
index c90b24308a..d49d2555ab 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
+import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -260,6 +261,56 @@ public class EmptyResponseIntegrationTest extends
BaseClusterIntegrationTestSet
assertDataTypes(response, "LONG", "DOUBLE");
}
+ @Test
+ public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception {
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setRoutingConfig(
+ new RoutingConfig(null,
Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null));
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ TableConfig cfg = getOfflineTableConfig();
+ if (cfg.getRoutingConfig() == null ||
cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) {
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to update table config");
+
+ String query =
+ "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch <
-1231231 and FlightNum > 121231231231";
+
+ // Parse the Json response. Assert if DestAirportID has columnDatatype INT
and Carrier has columnDatatype STRING.
+ JsonNode queryResponse = postQuery(query);
+ assertNoRowsReturned(queryResponse);
+ assertDataTypes(queryResponse, "INT", "STRING");
+
+ query = "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch <
-1231231";
+ queryResponse = postQuery(query);
+ assertNoRowsReturned(queryResponse);
+ assertDataTypes(queryResponse, "INT", "STRING");
+
+ // Reset and remove the Time Segment Pruner
+ tableConfig = getOfflineTableConfig();
+ tableConfig.setRoutingConfig(new RoutingConfig(null,
Collections.emptyList(), null, null));
+ updateTableConfig(tableConfig);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ TableConfig cfg = getOfflineTableConfig();
+ if (cfg.getRoutingConfig() == null ||
cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) {
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to update table config");
+ }
+
+
private void assertNoRowsReturned(JsonNode response) {
assertNotNull(response.get("resultTable"));
assertNotNull(response.get("resultTable").get("rows"));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
index d2adb436d6..bd4673f853 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -180,6 +180,6 @@ public class SegmentGenerationMinionClusterIntegrationTest
extends BaseClusterIn
String query = "SELECT COUNT(*) FROM " + tableName;
JsonNode response = postQuery(query);
JsonNode resTbl = response.get("resultTable");
- return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt();
+ return (resTbl.get("rows").size() == 0) ? 0 :
resTbl.get("rows").get(0).get(0).asInt();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]