This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch adss-hotfix
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/adss-hotfix by this push:
new 6f25fa533a AdaptiveServerSelection: update response stats for servers
that have not responded (#9801)
6f25fa533a is described below
commit 6f25fa533a1d98e2a0df281625492b3a648b5590
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Mon Nov 14 23:57:58 2022 -0800
AdaptiveServerSelection: update response stats for servers that have not
responded (#9801)
---
.../pinot/core/transport/AsyncQueryResponse.java | 26 +++++++++++-
.../apache/pinot/core/transport/QueryRouter.java | 13 +-----
.../pinot/core/transport/QueryRoutingTest.java | 48 +++++++++++++++++++++-
3 files changed, 72 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 6e9bafeb71..ab32316fdb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
+import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
/**
@@ -43,12 +44,13 @@ public class AsyncQueryResponse implements QueryResponse {
private final CountDownLatch _countDownLatch;
private final long _maxEndTimeMs;
private final long _timeoutMs;
+ private final ServerRoutingStatsManager _serverRoutingStatsManager;
private volatile ServerRoutingInstance _failedServer;
private volatile Exception _exception;
public AsyncQueryResponse(QueryRouter queryRouter, long requestId,
Set<ServerRoutingInstance> serversQueried,
- long startTimeMs, long timeoutMs) {
+ long startTimeMs, long timeoutMs, ServerRoutingStatsManager
serverRoutingStatsManager) {
_queryRouter = queryRouter;
_requestId = requestId;
int numServersQueried = serversQueried.size();
@@ -59,6 +61,7 @@ public class AsyncQueryResponse implements QueryResponse {
_countDownLatch = new CountDownLatch(numServersQueried);
_timeoutMs = timeoutMs;
_maxEndTimeMs = startTimeMs + timeoutMs;
+ _serverRoutingStatsManager = serverRoutingStatsManager;
}
@Override
@@ -84,6 +87,17 @@ public class AsyncQueryResponse implements QueryResponse {
_status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED :
Status.TIMED_OUT);
return _responseMap;
} finally {
+ // Update ServerRoutingStats.
+ for (Map.Entry<ServerRoutingInstance, ServerResponse> entry :
_responseMap.entrySet()) {
+ ServerResponse response = entry.getValue();
+ if (response == null || response.getDataTable() == null) {
+ // These are servers from which a response was not received. So
update query response stats for such
+ // servers with maximum latency i.e timeout value.
+
_serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId,
entry.getKey().getInstanceId(),
+ _timeoutMs);
+ }
+ }
+
_queryRouter.markQueryDone(_requestId);
}
}
@@ -134,7 +148,15 @@ public class AsyncQueryResponse implements QueryResponse {
void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable
dataTable, int responseSize,
int deserializationTimeMs) {
- _responseMap.get(serverRoutingInstance).receiveDataTable(dataTable,
responseSize, deserializationTimeMs);
+ ServerResponse response = _responseMap.get(serverRoutingInstance);
+ response.receiveDataTable(dataTable, responseSize, deserializationTimeMs);
+
+ // Record query completion stats immediately after receiving the response
from the server instead of waiting
+ // for all servers to respond. This helps to keep the stats up-to-date.
+ long latencyMs = response.getResponseDelayMs();
+ _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId,
serverRoutingInstance.getInstanceId(),
+ latencyMs);
+
_numServersResponded.getAndIncrement();
_countDownLatch.countDown();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index f2cb638d00..68c58088c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -120,7 +120,8 @@ public class QueryRouter {
// Create the asynchronous query response with the request map
AsyncQueryResponse asyncQueryResponse =
- new AsyncQueryResponse(this, requestId, requestMap.keySet(),
System.currentTimeMillis(), timeoutMs);
+ new AsyncQueryResponse(this, requestId, requestMap.keySet(),
System.currentTimeMillis(), timeoutMs,
+ _serverRoutingStatsManager);
_asyncQueryResponseMap.put(requestId, asyncQueryResponse);
for (Map.Entry<ServerRoutingInstance, InstanceRequest> entry :
requestMap.entrySet()) {
ServerRoutingInstance serverRoutingInstance = entry.getKey();
@@ -153,8 +154,6 @@ public class QueryRouter {
AsyncQueryResponse asyncQueryResponse, Exception e) {
LOGGER.error("Caught exception while sending request {} to server: {},
marking query failed", requestId,
serverRoutingInstance, e);
- _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId,
serverRoutingInstance.getInstanceId(),
- (int) asyncQueryResponse.getTimeoutMs());
asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
}
@@ -188,20 +187,12 @@ public class QueryRouter {
// Query future might be null if the query is already done (maybe due to
failure)
if (asyncQueryResponse != null) {
asyncQueryResponse.receiveDataTable(serverRoutingInstance, dataTable,
responseSize, deserializationTimeMs);
-
- // Record query completion stats immediately after receiving the
response from the server instead of waiting
- // for the reduce phase.
- long latencyMs =
asyncQueryResponse.getServerResponseDelayMs(serverRoutingInstance);
- _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId,
serverRoutingInstance.getInstanceId(),
- latencyMs);
}
}
void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception
exception) {
for (AsyncQueryResponse asyncQueryResponse :
_asyncQueryResponseMap.values()) {
asyncQueryResponse.markServerDown(serverRoutingInstance, exception);
-
_serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(),
- serverRoutingInstance.getInstanceId(), (int)
asyncQueryResponse.getTimeoutMs());
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 9fdaf7fb7d..5e1fa04176 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.transport;
import com.google.common.util.concurrent.Futures;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.datatable.DataTable;
@@ -33,7 +34,9 @@ import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -60,10 +63,18 @@ public class QueryRoutingTest {
Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
private QueryRouter _queryRouter;
+ private ServerRoutingStatsManager _serverRoutingStatsManager;
+ int _requestCount;
@BeforeClass
public void setUp() {
- _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class),
mock(ServerRoutingStatsManager.class));
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
true);
+ PinotConfiguration cfg = new PinotConfiguration(properties);
+ _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ _serverRoutingStatsManager.init();
+ _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class),
_serverRoutingStatsManager);
+ _requestCount = 0;
}
private QueryServer getQueryServer(int responseDelayMs, byte[]
responseBytes) {
@@ -88,6 +99,7 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -95,13 +107,17 @@ public class QueryRoutingTest {
// OFFLINE only
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 600_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ // 2 requests - query submit and query response.
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
// REALTIME only
asyncQueryResponse =
@@ -112,6 +128,9 @@ public class QueryRoutingTest {
serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
// Hybrid
asyncQueryResponse =
@@ -127,6 +146,9 @@ public class QueryRoutingTest {
serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ _requestCount += 4;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
// Shut down the server
queryServer.shutDown();
@@ -136,6 +158,7 @@ public class QueryRoutingTest {
public void testInvalidResponse()
throws Exception {
long requestId = 123;
+ String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, new byte[0]);
@@ -154,6 +177,9 @@ public class QueryRoutingTest {
assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should time out
assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
// Shut down the server
queryServer.shutDown();
@@ -166,6 +192,7 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -184,6 +211,9 @@ public class QueryRoutingTest {
assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should time out
assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
// Shut down the server
queryServer.shutDown();
@@ -199,6 +229,7 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(500, responseBytes);
@@ -221,6 +252,10 @@ public class QueryRoutingTest {
assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should early terminate
assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
+
// Submit query after server is down
startTimeMs = System.currentTimeMillis();
@@ -237,6 +272,15 @@ public class QueryRoutingTest {
assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should early terminate
assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
+ }
+
+ private void waitForStatsUpdate(long taskCount) {
+ TestUtils.waitForCondition(aVoid -> {
+ return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount);
+ }, 5L, 5000, "Failed to record stats for AdaptiveServerSelectorTest");
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]