This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.0 by this push:
new f3a89ba8e73 Report zero values instead of unknown for empty ingest
queries (#15674) (#15791)
f3a89ba8e73 is described below
commit f3a89ba8e73c163dabe16d17b0a86619086153da
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jan 30 21:33:13 2024 +0530
Report zero values instead of unknown for empty ingest queries (#15674)
(#15791)
MSQ now allows empty ingest queries by default. For such queries that don't
generate any output rows, the query counters in the async status result
object/task report don't contain numTotalRows and totalSizeInBytes. These
properties when not set/undefined can be confusing to API clients. For example,
the web-console treats it as unknown values.
This patch fixes the counters by explicitly reporting them as 0 instead of
null for empty ingest queries.
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
.../druid/msq/util/SqlStatementResourceHelper.java | 20 +--
.../resources/SqlMSQStatementResourcePostTest.java | 60 ++++++-
.../org/apache/druid/msq/test/MSQTestBase.java | 4 +-
.../msq/util/SqlStatementResourceHelperTest.java | 193 +++++++++++++++++----
.../druid/server/router/QueryHostFinder.java | 12 +-
5 files changed, 228 insertions(+), 61 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
index 9481fc60541..b60f81ecca6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
@@ -172,31 +172,22 @@ public class SqlStatementResourceHelper
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
.getOrDefault("segmentGenerationProgress", null);
- if (queryCounterSnapshot != null && queryCounterSnapshot instanceof
SegmentGenerationProgressCounter.Snapshot) {
+ if (queryCounterSnapshot instanceof
SegmentGenerationProgressCounter.Snapshot) {
rows += ((SegmentGenerationProgressCounter.Snapshot)
queryCounterSnapshot).getRowsPushed();
}
}
- if (rows != 0L) {
- return Optional.of(ImmutableList.of(new PageInformation(0, rows,
null)));
- } else {
- return Optional.empty();
- }
+ return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else if (msqDestination instanceof TaskReportMSQDestination) {
long rows = 0L;
long size = 0L;
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot =
counterSnapshots.getMap().getOrDefault("output", null);
- if (queryCounterSnapshot != null && queryCounterSnapshot instanceof
ChannelCounters.Snapshot) {
+ if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot)
queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot)
queryCounterSnapshot).getBytes()).sum();
}
}
- if (rows != 0L) {
- return Optional.of(ImmutableList.of(new PageInformation(0, rows,
size)));
- } else {
- return Optional.empty();
- }
-
+ return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else if (msqDestination instanceof DurableStorageMSQDestination) {
return populatePagesForDurableStorageDestination(finalStage,
workerCounters);
@@ -221,7 +212,6 @@ public class SqlStatementResourceHelper
throw DruidException.defensive("Expected worker count to be set for
stage[%d]", finalStage);
}
-
List<PageInformation> pages = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < totalPartitions;
partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount;
workerNumber++) {
@@ -230,7 +220,7 @@ public class SqlStatementResourceHelper
if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters =
workerCounter.getMap().get("output");
- if (channelCounters != null && channelCounters instanceof
ChannelCounters.Snapshot) {
+ if (channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
index 72e7d345a3d..b50129c73e0 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
@@ -179,6 +179,65 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
);
}
+ @Test
+ public void emptyInsert()
+ {
+ Response response = resource.doPost(new SqlQuery(
+ "insert into foo1 select __time, dim1 , count(*) as cnt from foo
where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1,
2 PARTITIONED by day clustered by dim1",
+ null,
+ false,
+ false,
+ false,
+ ImmutableMap.<String, Object>builder()
+ .putAll(defaultAsyncContext())
+ .build(),
+ null
+ ), SqlStatementResourceTest.makeOkRequest());
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+
+ SqlStatementResult actual = (SqlStatementResult) response.getEntity();
+
+ SqlStatementResult expected = new SqlStatementResult(
+ actual.getQueryId(),
+ SqlStatementState.SUCCESS,
+ MSQTestOverlordServiceClient.CREATED_TIME,
+ null,
+ MSQTestOverlordServiceClient.DURATION,
+ new ResultSetInformation(0L, 0L, null, "foo1", null, null),
+ null
+ );
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void emptyReplace()
+ {
+ Response response = resource.doPost(new SqlQuery(
+ "replace into foo1 overwrite all select __time, dim1 , count(*) as
cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01
00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
+ null,
+ false,
+ false,
+ false,
+ ImmutableMap.<String, Object>builder()
+ .putAll(defaultAsyncContext())
+ .build(),
+ null
+ ), SqlStatementResourceTest.makeOkRequest());
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+
+ SqlStatementResult actual = (SqlStatementResult) response.getEntity();
+
+ SqlStatementResult expected = new SqlStatementResult(
+ actual.getQueryId(),
+ SqlStatementState.SUCCESS,
+ MSQTestOverlordServiceClient.CREATED_TIME,
+ null,
+ MSQTestOverlordServiceClient.DURATION,
+ new ResultSetInformation(0L, 0L, null, "foo1", null, null),
+ null
+ );
+ Assert.assertEquals(expected, actual);
+ }
@Test
public void insertCannotBeEmptyFaultTest()
@@ -433,7 +492,6 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
-
Assert.assertEquals(rows,
SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 8891df46f2d..0146fcf9bd8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -1404,8 +1404,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
rows.addAll(new
FrameChannelSequence(inputChannelFactory.openChannel(
finalStage.getId(),
- pageInformation.getWorker(),
- pageInformation.getPartition()
+ pageInformation.getWorker() == null ? 0 :
pageInformation.getWorker(),
+ pageInformation.getPartition() == null ? 0 :
pageInformation.getPartition()
)).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
msqControllerTask,
finalStage,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 0254a61a2c7..65bd004c9b5 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.Frame;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
+import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -38,6 +40,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,9 +49,6 @@ import java.util.TreeMap;
public class SqlStatementResourceHelperTest
{
-
- private static final Logger log = new
Logger(SqlStatementResourceHelperTest.class);
-
@Test
public void testDistinctPartitionsOnEachWorker()
{
@@ -83,7 +83,7 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
- validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
+ validatePages(pages.get(), getExpectedPageInformationList(worker0,
worker1, worker2));
}
@Test
@@ -122,7 +122,7 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
- validatePages(pages.get(), createValidationMap(worker0, worker1, worker2));
+ validatePages(pages.get(), getExpectedPageInformationList(worker0,
worker1, worker2));
}
@@ -160,7 +160,7 @@ public class SqlStatementResourceHelperTest
Optional<List<PageInformation>> pages =
SqlStatementResourceHelper.populatePageList(payload,
DurableStorageMSQDestination.instance());
- validatePages(pages.get(), createValidationMap(worker0, worker1, worker2,
worker3));
+ validatePages(pages.get(), getExpectedPageInformationList(worker0,
worker1, worker2, worker3));
}
@@ -200,10 +200,9 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
- validatePages(pages.get(), createValidationMap(worker0, worker1, worker2,
worker3));
+ validatePages(pages.get(), getExpectedPageInformationList(worker0,
worker1, worker2, worker3));
}
-
@Test
public void testConsecutivePartitionsOnEachWorker()
{
@@ -240,41 +239,148 @@ public class SqlStatementResourceHelperTest
payload,
DurableStorageMSQDestination.instance()
);
- validatePages(pages.get(), createValidationMap(worker0, worker1, worker2,
worker3));
+ validatePages(pages.get(), getExpectedPageInformationList(worker0,
worker1, worker2, worker3));
}
+ /**
+ * Durable storage destination applies only to SELECT queries and unlike
ingest queries, emtpy worker counters will not
+ * be reported in this case. See {@link
#testEmptyCountersForTaskReportDestination()} and {@link
#testEmptyCountersForDataSourceDestination()}
+ * to see the difference.
+ */
+ @Test
+ public void testEmptyCountersForDurableStorageDestination()
+ {
+ CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+ ChannelCounters worker0 = createChannelCounters(new int[0]);
+
+ counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+ MSQTaskReportPayload payload = new MSQTaskReportPayload(
+ new MSQStatusReport(
+ TaskState.SUCCESS,
+ null,
+ new ArrayDeque<>(),
+ null,
+ 0,
+ new HashMap<>(),
+ 1,
+ 2,
+ null
+ ),
+ MSQStagesReport.create(
+ MSQTaskReportTest.QUERY_DEFINITION,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(0, 1),
+ ImmutableMap.of(0, 1)
+ ),
+ counterSnapshots,
+ null
+ );
+
+ Optional<List<PageInformation>> pages =
SqlStatementResourceHelper.populatePageList(
+ payload,
+ DurableStorageMSQDestination.instance()
+ );
+ validatePages(pages.get(), getExpectedPageInformationList(worker0));
+ }
- private void validatePages(
- List<PageInformation> pageList,
- Map<Integer, Map<Integer, Pair<Long, Long>>> partitionToWorkerToRowsBytes
- )
+ @Test
+ public void testEmptyCountersForTaskReportDestination()
{
- int currentPage = 0;
- for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>> partitionWorker :
partitionToWorkerToRowsBytes.entrySet()) {
- for (Map.Entry<Integer, Pair<Long, Long>> workerRowsBytes :
partitionWorker.getValue().entrySet()) {
- PageInformation pageInformation = pageList.get(currentPage);
- Assert.assertEquals(currentPage, pageInformation.getId());
- Assert.assertEquals(workerRowsBytes.getValue().lhs,
pageInformation.getNumRows());
- Assert.assertEquals(workerRowsBytes.getValue().rhs,
pageInformation.getSizeInBytes());
- Assert.assertEquals(partitionWorker.getKey(),
pageInformation.getPartition());
- Assert.assertEquals(workerRowsBytes.getKey(),
pageInformation.getWorker());
- log.debug(pageInformation.toString());
- currentPage++;
- }
- }
- Assert.assertEquals(currentPage, pageList.size());
+ CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+ counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+ MSQTaskReportPayload payload = new MSQTaskReportPayload(
+ new MSQStatusReport(
+ TaskState.SUCCESS,
+ null,
+ new ArrayDeque<>(),
+ null,
+ 0,
+ new HashMap<>(),
+ 1,
+ 2,
+ null
+ ),
+ MSQStagesReport.create(
+ MSQTaskReportTest.QUERY_DEFINITION,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(0, 1),
+ ImmutableMap.of(0, 1)
+ ),
+ counterSnapshots,
+ null
+ );
+
+ Optional<List<PageInformation>> pages =
SqlStatementResourceHelper.populatePageList(
+ payload,
+ TaskReportMSQDestination.instance()
+ );
+ Assert.assertTrue(pages.isPresent());
+ Assert.assertEquals(1, pages.get().size());
+ Assert.assertEquals(new PageInformation(0, 0L, 0L), pages.get().get(0));
+ }
+
+ @Test
+ public void testEmptyCountersForDataSourceDestination()
+ {
+ CounterSnapshotsTree counterSnapshots = new CounterSnapshotsTree();
+ counterSnapshots.put(0, 0, new CounterSnapshots(ImmutableMap.of()));
+
+ MSQTaskReportPayload payload = new MSQTaskReportPayload(
+ new MSQStatusReport(
+ TaskState.SUCCESS,
+ null,
+ new ArrayDeque<>(),
+ null,
+ 0,
+ new HashMap<>(),
+ 1,
+ 2,
+ null
+ ),
+ MSQStagesReport.create(
+ MSQTaskReportTest.QUERY_DEFINITION,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(0, 1),
+ ImmutableMap.of(0, 1)
+ ),
+ counterSnapshots,
+ null
+ );
+
+ Optional<List<PageInformation>> pages =
SqlStatementResourceHelper.populatePageList(
+ payload,
+ new DataSourceMSQDestination(
+ "test",
+ Granularities.DAY,
+ null,
+ null
+ )
+ );
+ Assert.assertTrue(pages.isPresent());
+ Assert.assertEquals(1, pages.get().size());
+ Assert.assertEquals(new PageInformation(0, 0L, null), pages.get().get(0));
}
- private Map<Integer, Map<Integer, Pair<Long, Long>>> createValidationMap(
- ChannelCounters... workers
- )
+ private void validatePages(List<PageInformation> actualPageList,
List<PageInformation> expectedPageList)
{
- if (workers == null || workers.length == 0) {
- return new HashMap<>();
+ Assert.assertEquals(expectedPageList.size(), actualPageList.size());
+ Assert.assertEquals(expectedPageList, actualPageList);
+ }
+
+ private List<PageInformation>
getExpectedPageInformationList(ChannelCounters... workerCounters)
+ {
+ List<PageInformation> pageInformationList = new ArrayList<>();
+ if (workerCounters == null || workerCounters.length == 0) {
+ return pageInformationList;
} else {
Map<Integer, Map<Integer, Pair<Long, Long>>>
partitionToWorkerToRowsBytes = new TreeMap<>();
- for (int worker = 0; worker < workers.length; worker++) {
- ChannelCounters.Snapshot workerCounter = workers[worker].snapshot();
+ for (int worker = 0; worker < workerCounters.length; worker++) {
+ ChannelCounters.Snapshot workerCounter =
workerCounters[worker].snapshot();
for (int partition = 0; workerCounter != null && partition <
workerCounter.getRows().length; partition++) {
Map<Integer, Pair<Long, Long>> workerMap =
partitionToWorkerToRowsBytes.computeIfAbsent(
partition,
@@ -290,14 +396,27 @@ public class SqlStatementResourceHelperTest
)
);
}
+ }
+ }
+ // Construct the pages based on the order of partitionToWorkerMap.
+ for (Map.Entry<Integer, Map<Integer, Pair<Long, Long>>>
partitionToWorkerMap : partitionToWorkerToRowsBytes.entrySet()) {
+ for (Map.Entry<Integer, Pair<Long, Long>> workerToRowsBytesMap :
partitionToWorkerMap.getValue().entrySet()) {
+ pageInformationList.add(
+ new PageInformation(
+ pageInformationList.size(),
+ workerToRowsBytesMap.getValue().lhs,
+ workerToRowsBytesMap.getValue().rhs,
+ workerToRowsBytesMap.getKey(),
+ partitionToWorkerMap.getKey()
+ )
+ );
}
}
- return partitionToWorkerToRowsBytes;
+ return pageInformationList;
}
}
-
private ChannelCounters createChannelCounters(int[] partitions)
{
if (partitions == null || partitions.length == 0) {
diff --git
a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
index 3c15e4b8554..59251ab4c70 100644
--- a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
+++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
@@ -76,12 +76,12 @@ public class QueryHostFinder
Server chosenServer =
avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
assertServerFound(
chosenServer,
- "No server found for Avatica request with connectionId [%s]",
+ "No server found for Avatica request with connectionId[%s]",
connectionId
);
log.debug(
- "Balancer class [%s] sending request with connectionId [%s] to server:
%s",
+ "Balancer class[%s] sending request with connectionId[%s] to
server[%s]",
avaticaConnectionBalancer.getClass(),
connectionId,
chosenServer.getHost()
@@ -120,7 +120,7 @@ public class QueryHostFinder
Server server = findDefaultServer();
assertServerFound(
server,
- "There are no available brokers. Please check that your brokers are
running and " + " healthy."
+ "There are no available brokers. Please check that your brokers are
running and healthy."
);
return server;
}
@@ -136,7 +136,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
- "No server found for serviceName [%s]. Using backup",
+ "No server found for serviceName[%s]. Using backup",
serviceName
);
@@ -144,7 +144,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
- "No backup found for serviceName [%s]. Using default [%s]",
+ "No backup found for serviceName[%s]. Using default[%s]",
serviceName,
hostSelector.getDefaultServiceName()
);
@@ -162,7 +162,7 @@ public class QueryHostFinder
private void assertServerFound(Server server, String messageFormat,
Object... args)
{
if (server != null) {
- log.debug("Selected [%s]", server.getHost());
+ log.debug("Selected server[%s]", server.getHost());
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]