This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 743d6b6f02 add `skipUnavailableServers` query option (#13387)
743d6b6f02 is described below

commit 743d6b6f02a2b70ac5168a8eaf3c0a9db8116427
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Mon Jun 17 17:36:09 2024 -0700

    add `skipUnavailableServers` query option (#13387)
    
    * add skipUnavailableServers query option
    
    * fix flakey test
    
    * address comment
---
 .../common/utils/config/QueryOptionsUtils.java     |  4 +
 .../pinot/core/transport/AsyncQueryResponse.java   |  8 ++
 .../apache/pinot/core/transport/QueryRouter.java   | 22 +++++-
 .../pinot/core/transport/QueryRoutingTest.java     | 92 +++++++++++++++++++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 +
 5 files changed, 126 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 12bc750679..fe1b348a28 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -306,4 +306,8 @@ public class QueryOptionsUtils {
     String windowOverflowModeStr = 
queryOptions.get(QueryOptionKey.WINDOW_OVERFLOW_MODE);
     return windowOverflowModeStr != null ? 
WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
   }
+
+  public static boolean isSkipUnavailableServers(Map<String, String> 
queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS));
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 7bcc90b50d..2ec90ab3b9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -181,4 +181,12 @@ public class AsyncQueryResponse implements QueryResponse {
       markQueryFailed(serverRoutingInstance, exception);
     }
   }
+
+  /**
+   * Wait for one less server response. This is used when the server is 
skipped, as
+   * query submission will have failed we do not want to wait for the response.
+   */
+  void skipServerResponse() {
+    _countDownLatch.countDown();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index e5b96840e7..b5981785de 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -96,6 +97,9 @@ public class QueryRouter {
     // can prefer but not require TLS until all servers guaranteed to be on TLS
     boolean preferTls = _serverChannelsTls != null;
 
+    // skip unavailable servers if the query option is set
+    boolean skipUnavailableServers = 
isSkipUnavailableServers(offlineBrokerRequest, realtimeBrokerRequest);
+
     // Build map from server to request based on the routing table
     Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
     if (offlineBrokerRequest != null) {
@@ -137,14 +141,28 @@ public class QueryRouter {
         break;
       } catch (Exception e) {
         _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
-        markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, 
e);
-        break;
+        if (skipUnavailableServers) {
+          asyncQueryResponse.skipServerResponse();
+        } else {
+          markQueryFailed(requestId, serverRoutingInstance, 
asyncQueryResponse, e);
+          break;
+        }
       }
     }
 
     return asyncQueryResponse;
   }
 
+  private boolean isSkipUnavailableServers(@Nullable BrokerRequest 
offlineBrokerRequest,
+      @Nullable BrokerRequest realtimeBrokerRequest) {
+    if (offlineBrokerRequest != null && 
QueryOptionsUtils.isSkipUnavailableServers(
+        offlineBrokerRequest.getPinotQuery().getQueryOptions())) {
+      return true;
+    }
+    return realtimeBrokerRequest != null && 
QueryOptionsUtils.isSkipUnavailableServers(
+        realtimeBrokerRequest.getPinotQuery().getQueryOptions());
+  }
+
   private void markQueryFailed(long requestId, ServerRoutingInstance 
serverRoutingInstance,
       AsyncQueryResponse asyncQueryResponse, Exception e) {
     LOGGER.error("Caught exception while sending request {} to server: {}, 
marking query failed", requestId,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index cbf4174bca..cec413e424 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.transport;
 
 import com.google.common.util.concurrent.Futures;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +30,8 @@ import 
org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
@@ -85,11 +88,15 @@ public class QueryRoutingTest {
   }
 
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes) {
+    return getQueryServer(responseDelayMs, responseBytes, TEST_PORT);
+  }
+
+  private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes, int port) {
     ServerMetrics serverMetrics = mock(ServerMetrics.class);
     InstanceRequestHandler handler = new InstanceRequestHandler("server01", 
new PinotConfiguration(),
         mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics, 
mock(AccessControl.class));
     ServerMetrics.register(serverMetrics);
-    return new QueryServer(TEST_PORT, null, handler);
+    return new QueryServer(port, null, handler);
   }
 
   private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] 
responseBytes) {
@@ -286,6 +293,89 @@ public class QueryRoutingTest {
     
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
   }
 
+  @Test
+  public void testSkipUnavailableServer()
+      throws IOException, InterruptedException {
+    // Using a different port is a hack to avoid resource conflict with other 
tests, ideally queryServer.shutdown()
+    // should ensure there is no possibility of resource conflict.
+    int port = 12346;
+    ServerInstance serverInstance1 = new ServerInstance("localhost", port);
+    ServerInstance serverInstance2 = new ServerInstance("localhost", port + 1);
+    ServerRoutingInstance serverRoutingInstance1 =
+        serverInstance1.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
+    ServerRoutingInstance serverRoutingInstance2 =
+        serverInstance2.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
+    Map<ServerInstance, Pair<List<String>, List<String>>> routingTable =
+        Map.of(serverInstance1, Pair.of(Collections.emptyList(), 
Collections.emptyList()), serverInstance2,
+            Pair.of(Collections.emptyList(), Collections.emptyList()));
+
+    long requestId = 123;
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"column1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    builder.startRow();
+    builder.setColumn(0, "value1");
+    builder.finishRow();
+    DataTable dataTableSuccess = builder.build();
+    Map<String, String> dataTableMetadata = dataTableSuccess.getMetadata();
+    dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    byte[] successResponseBytes = dataTableSuccess.toBytes();
+
+    // Only start a single QueryServer, on port from serverInstance1
+    QueryServer queryServer = getQueryServer(500, successResponseBytes, port);
+    queryServer.start();
+
+    // Submit the query with skipUnavailableServers=true, the single started 
server should return a valid response
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SET 
skipUnavailableServers=true; SELECT * FROM testTable");
+    long startTime = System.currentTimeMillis();
+    AsyncQueryResponse asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", brokerRequest, 
routingTable, null, null, 10_000L);
+    Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 2);
+    assertTrue(response.containsKey(serverRoutingInstance1));
+    assertTrue(response.containsKey(serverRoutingInstance2));
+
+    ServerResponse serverResponse1 = response.get(serverRoutingInstance1);
+    ServerResponse serverResponse2 = response.get(serverRoutingInstance2);
+    assertNotNull(serverResponse1.getDataTable());
+    assertNull(serverResponse2.getDataTable());
+    assertTrue(serverResponse1.getResponseDelayMs() > 500);   // > response 
delay set by getQueryServer
+    assertTrue(serverResponse2.getResponseDelayMs() < 100);   // connection 
refused, no delay
+    assertTrue(System.currentTimeMillis() - startTime > 500); // > response 
delay set by getQueryServer
+    _requestCount += 4;
+    waitForStatsUpdate(_requestCount);
+    assertEquals(
+        
_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance1.getInstanceId()).intValue(),
 0);
+    assertEquals(
+        
_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(),
 0);
+
+    // Submit the same query without skipUnavailableServers, the servers 
should not return any response
+    brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM 
testTable");
+    startTime = System.currentTimeMillis();
+    asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", brokerRequest, 
routingTable, null, null, 10_000L);
+    response = asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 2);
+    assertTrue(response.containsKey(serverRoutingInstance1));
+    assertTrue(response.containsKey(serverRoutingInstance2));
+
+    serverResponse1 = response.get(serverRoutingInstance1);
+    serverResponse2 = response.get(serverRoutingInstance2);
+    assertNull(serverResponse1.getDataTable());
+    assertNull(serverResponse2.getDataTable());
+    assertTrue(serverResponse1.getResponseDelayMs() < 100);
+    assertTrue(serverResponse2.getResponseDelayMs() < 100);
+    assertTrue(System.currentTimeMillis() - startTime < 100);
+    _requestCount += 4;
+    waitForStatsUpdate(_requestCount);
+    assertEquals(
+        
_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance1.getInstanceId()).intValue(),
 0);
+    assertEquals(
+        
_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(),
 0);
+    queryServer.shutDown();
+  }
+
   private void waitForStatsUpdate(long taskCount) {
     TestUtils.waitForCondition(aVoid -> {
       return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index e268a508af..3753c0e64b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -412,6 +412,9 @@ public class CommonConstants {
         // Indicates the maximum length of serialized response across all 
servers for a query. This value is equally
         // divided across all servers processing the query.
         public static final String MAX_QUERY_RESPONSE_SIZE_BYTES = 
"maxQueryResponseSizeBytes";
+
+        // If query submission causes an exception, still continue to submit 
the query to other servers
+        public static final String SKIP_UNAVAILABLE_SERVERS = 
"skipUnavailableServers";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to