This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 743d6b6f02 add `skipUnavailableServers` query option (#13387) 743d6b6f02 is described below commit 743d6b6f02a2b70ac5168a8eaf3c0a9db8116427 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Mon Jun 17 17:36:09 2024 -0700 add `skipUnavailableServers` query option (#13387) * add skipUnavailableServers query option * fix flakey test * address comment --- .../common/utils/config/QueryOptionsUtils.java | 4 + .../pinot/core/transport/AsyncQueryResponse.java | 8 ++ .../apache/pinot/core/transport/QueryRouter.java | 22 +++++- .../pinot/core/transport/QueryRoutingTest.java | 92 +++++++++++++++++++++- .../apache/pinot/spi/utils/CommonConstants.java | 3 + 5 files changed, 126 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 12bc750679..fe1b348a28 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -306,4 +306,8 @@ public class QueryOptionsUtils { String windowOverflowModeStr = queryOptions.get(QueryOptionKey.WINDOW_OVERFLOW_MODE); return windowOverflowModeStr != null ? WindowOverFlowMode.valueOf(windowOverflowModeStr) : null; } + + public static boolean isSkipUnavailableServers(Map<String, String> queryOptions) { + return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS)); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index 7bcc90b50d..2ec90ab3b9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -181,4 +181,12 @@ public class AsyncQueryResponse implements QueryResponse { markQueryFailed(serverRoutingInstance, exception); } } + + /** + * Wait for one less server response. This is used when the server is skipped, as + * query submission will have failed we do not want to wait for the response. + */ + void skipServerResponse() { + _countDownLatch.countDown(); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index e5b96840e7..b5981785de 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -96,6 +97,9 @@ public class QueryRouter { // can prefer but not require TLS until all servers guaranteed to be on TLS boolean preferTls = _serverChannelsTls != null; + // skip unavailable servers if the query option is set + boolean skipUnavailableServers = isSkipUnavailableServers(offlineBrokerRequest, realtimeBrokerRequest); + // Build map from server to request based on the routing table Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { @@ -137,14 +141,28 @@ public class QueryRouter { break; } catch (Exception e) { _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1); - markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); - break; + if (skipUnavailableServers) { + asyncQueryResponse.skipServerResponse(); + } else { + markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); + break; + } } } return asyncQueryResponse; } + private boolean isSkipUnavailableServers(@Nullable BrokerRequest offlineBrokerRequest, + @Nullable BrokerRequest realtimeBrokerRequest) { + if (offlineBrokerRequest != null && QueryOptionsUtils.isSkipUnavailableServers( + offlineBrokerRequest.getPinotQuery().getQueryOptions())) { + return true; + } + return realtimeBrokerRequest != null && QueryOptionsUtils.isSkipUnavailableServers( + realtimeBrokerRequest.getPinotQuery().getQueryOptions()); + } + private void markQueryFailed(long requestId, ServerRoutingInstance serverRoutingInstance, AsyncQueryResponse asyncQueryResponse, Exception e) { LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index cbf4174bca..cec413e424 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.transport; import com.google.common.util.concurrent.Futures; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -29,6 +30,8 @@ import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -85,11 +88,15 @@ public class QueryRoutingTest { } private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) { + return getQueryServer(responseDelayMs, responseBytes, TEST_PORT); + } + + private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes, int port) { ServerMetrics serverMetrics = mock(ServerMetrics.class); InstanceRequestHandler handler = new InstanceRequestHandler("server01", new PinotConfiguration(), mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics, mock(AccessControl.class)); ServerMetrics.register(serverMetrics); - return new QueryServer(TEST_PORT, null, handler); + return new QueryServer(port, null, handler); } private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) { @@ -286,6 +293,89 @@ public class QueryRoutingTest { assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); } + @Test + public void testSkipUnavailableServer() + throws IOException, InterruptedException { + // Using a different port is a hack to avoid resource conflict with other tests, ideally queryServer.shutdown() + // should ensure there is no possibility of resource conflict. + int port = 12346; + ServerInstance serverInstance1 = new ServerInstance("localhost", port); + ServerInstance serverInstance2 = new ServerInstance("localhost", port + 1); + ServerRoutingInstance serverRoutingInstance1 = + serverInstance1.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); + ServerRoutingInstance serverRoutingInstance2 = + serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); + Map<ServerInstance, Pair<List<String>, List<String>>> routingTable = + Map.of(serverInstance1, Pair.of(Collections.emptyList(), Collections.emptyList()), serverInstance2, + Pair.of(Collections.emptyList(), Collections.emptyList())); + + long requestId = 123; + DataSchema dataSchema = + new DataSchema(new String[]{"column1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + builder.startRow(); + builder.setColumn(0, "value1"); + builder.finishRow(); + DataTable dataTableSuccess = builder.build(); + Map<String, String> dataTableMetadata = dataTableSuccess.getMetadata(); + dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + byte[] successResponseBytes = dataTableSuccess.toBytes(); + + // Only start a single QueryServer, on port from serverInstance1 + QueryServer queryServer = getQueryServer(500, successResponseBytes, port); + queryServer.start(); + + // Submit the query with skipUnavailableServers=true, the single started server should return a valid response + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SET skipUnavailableServers=true; SELECT * FROM testTable"); + long startTime = System.currentTimeMillis(); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", brokerRequest, routingTable, null, null, 10_000L); + Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 2); + assertTrue(response.containsKey(serverRoutingInstance1)); + assertTrue(response.containsKey(serverRoutingInstance2)); + + ServerResponse serverResponse1 = response.get(serverRoutingInstance1); + ServerResponse serverResponse2 = response.get(serverRoutingInstance2); + assertNotNull(serverResponse1.getDataTable()); + assertNull(serverResponse2.getDataTable()); + assertTrue(serverResponse1.getResponseDelayMs() > 500); // > response delay set by getQueryServer + assertTrue(serverResponse2.getResponseDelayMs() < 100); // connection refused, no delay + assertTrue(System.currentTimeMillis() - startTime > 500); // > response delay set by getQueryServer + _requestCount += 4; + waitForStatsUpdate(_requestCount); + assertEquals( + _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance1.getInstanceId()).intValue(), 0); + assertEquals( + _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(), 0); + + // Submit the same query without skipUnavailableServers, the servers should not return any response + brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); + startTime = System.currentTimeMillis(); + asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", brokerRequest, routingTable, null, null, 10_000L); + response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 2); + assertTrue(response.containsKey(serverRoutingInstance1)); + assertTrue(response.containsKey(serverRoutingInstance2)); + + serverResponse1 = response.get(serverRoutingInstance1); + serverResponse2 = response.get(serverRoutingInstance2); + assertNull(serverResponse1.getDataTable()); + assertNull(serverResponse2.getDataTable()); + assertTrue(serverResponse1.getResponseDelayMs() < 100); + assertTrue(serverResponse2.getResponseDelayMs() < 100); + assertTrue(System.currentTimeMillis() - startTime < 100); + _requestCount += 4; + waitForStatsUpdate(_requestCount); + assertEquals( + _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance1.getInstanceId()).intValue(), 0); + assertEquals( + _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(), 0); + queryServer.shutDown(); + } + private void waitForStatsUpdate(long taskCount) { TestUtils.waitForCondition(aVoid -> { return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index e268a508af..3753c0e64b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -412,6 +412,9 @@ public class CommonConstants { // Indicates the maximum length of serialized response across all servers for a query. This value is equally // divided across all servers processing the query. public static final String MAX_QUERY_RESPONSE_SIZE_BYTES = "maxQueryResponseSizeBytes"; + + // If query submission causes an exception, still continue to submit the query to other servers + public static final String SKIP_UNAVAILABLE_SERVERS = "skipUnavailableServers"; } public static class QueryOptionValue { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org