gianm commented on code in PR #18235:
URL: https://github.com/apache/druid/pull/18235#discussion_r2202321523
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java:
##########
@@ -78,6 +82,31 @@ public String runDartSql(String sql, Object... args)
).trim();
}
+ public void runDartWithError(String sql, Matcher<Throwable> matcher,
Object... args)
Review Comment:
Instead of this method, use `assertThrows` with `runDartSql` in the test
case.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java:
##########
@@ -195,7 +200,35 @@ public <RowType, QueryType> DataServerQueryResult<RowType>
fetchRowsFromDataServ
}
}
- return new DataServerQueryResult<>(yielders, handedOffSegments,
dataSource);
+ return new DataServerQueryResult<>(yielders, handedOffSegments,
dataSourceName);
+ }
+
+ /**
+ * Transform the {@link InputNumberDataSource}, which are only understood by
MSQ tasks, back into a
+ * {@link TableDataSource}.
+ */
+ private DataSource transformDatasource(DataSource dataSource)
+ {
+ if (dataSource instanceof InputNumberDataSource) {
+ InputNumberDataSource numberDataSource = (InputNumberDataSource)
dataSource;
+ if (numberDataSource.getInputNumber() == inputNumber) {
+ return new TableDataSource(dataSourceName);
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Unknown InputNumberDataSource datasource
with number[%s]. Queries with realtime sources "
Review Comment:
This is an end-user-facing error message, it shouldn't start with `Unknown
InputNumberDataSource datasource with number[1].` because that doesn't mean
much to an end-user. The term `stage outputs` isn't very clear either. Please
rewrite it to be more useful to the end user. It should describe concepts they
would be familiar with from the SQL side, such as mentioning that broadcast
joins are not currently supported with realtime data.
Also, could this situation (an `InputNumberDataSource` for some non-base
datasource) happen in any case other than joins? If so the error message should
reflect that. Maybe unions?
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java:
##########
@@ -130,4 +159,33 @@ public MSQTaskReportPayload runTaskSql(String sql,
Object... args)
return taskReportPayload;
}
+
+ public void expectTaskFailure(String sql, String reason, Object... args)
Review Comment:
Instead of this, break out the task submit part from `runTaskSql` and call
it `submitTaskSql`. It should return `SqlTaskStatus`. The idea is that
generally we want the methods to be composable rather than super-specific to a
use case. With `submitTaskSql` broken out, the test case could call that
followed by `cluster.callApi().waitForTaskToFail` -- or anything else it wants
to do.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartControllerModule;
+import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
+import org.apache.druid.msq.guice.MSQDurableStorageModule;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.guice.MSQSqlModule;
+import org.apache.druid.msq.guice.SqlTaskModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class EmbeddedMSQRealtimeUnnestQueryTest extends EmbeddedClusterTestBase
Review Comment:
Now that there are two of these, we should think about how to simplify the
test cases and reduce duplicate code. It seems there's a lot of code copied
from `EmbeddedMSQRealtimeQueryTest`, with some small differences, like which
dataset is being loaded.
The simplest approach would be to make a `BaseRealtimeQueryTest` that lets
subclasses define the dataset, and provides all the other stuff. Another
approach could be to add helper functions for loading dataset, setting up
supervisors, etc, to `EmbeddedClusterApis`. @kfaraz - wondering if you have
thoughts on this?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java:
##########
@@ -195,7 +200,35 @@ public <RowType, QueryType> DataServerQueryResult<RowType>
fetchRowsFromDataServ
}
}
- return new DataServerQueryResult<>(yielders, handedOffSegments,
dataSource);
+ return new DataServerQueryResult<>(yielders, handedOffSegments,
dataSourceName);
+ }
+
+ /**
+ * Transform the {@link InputNumberDataSource}, which are only understood by
MSQ tasks, back into a
+ * {@link TableDataSource}.
+ */
+ private DataSource transformDatasource(DataSource dataSource)
+ {
+ if (dataSource instanceof InputNumberDataSource) {
+ InputNumberDataSource numberDataSource = (InputNumberDataSource)
dataSource;
+ if (numberDataSource.getInputNumber() == inputNumber) {
+ return new TableDataSource(dataSourceName);
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
Review Comment:
Category `UNSUPPORTED` would be more appropriate I think.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java:
##########
@@ -141,12 +163,38 @@ public EmbeddedDruidCluster createCluster()
.addResource(kafka)
.addServer(coordinator)
.addServer(overlord)
- .addServer(indexer)
- .addServer(broker)
- .addServer(historical)
.addServer(router);
}
+ @BeforeAll
+ @Override
+ protected void setup() throws Exception
+ {
+ cluster = createCluster();
+ cluster.start();
Review Comment:
This appears to be copied from the superclass; you can replace it with
`super.setup()`
##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -116,6 +116,20 @@ public void waitForTaskToSucceed(String taskId,
EmbeddedOverlord overlord)
verifyTaskHasStatus(taskId, TaskStatus.success(taskId));
}
+ /**
+ * Waits for the given task to fail, with the given errorMsq. If the given
+ * {@link EmbeddedOverlord} is not the leader, this method can only return by
+ * throwing an exception upon timeout.
+ */
+ public void waitForTaskToFail(String taskId, String errorMsg,
EmbeddedOverlord overlord)
Review Comment:
Better for this to take a `Matcher<String>` as the last argument, so it
would be called like:
```java
apis.waitForTaskToFail(taskId, overlord, CoreMatchers.equalTo("xyz"));
```
That way any kind of matching can be done, not just equality.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartControllerModule;
+import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
+import org.apache.druid.msq.guice.MSQDurableStorageModule;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.guice.MSQSqlModule;
+import org.apache.druid.msq.guice.SqlTaskModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class EmbeddedMSQRealtimeUnnestQueryTest extends EmbeddedClusterTestBase
+{
+ private static final Period TASK_DURATION = Period.hours(1);
+ private static final int TASK_COUNT = 2;
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedRouter router = new EmbeddedRouter();
+
+ private KafkaResource kafka;
+ private String topic;
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ kafka = new KafkaResource();
+
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+
+ overlord.addProperty("druid.manager.segments.useIncrementalCache",
"always")
+ .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+ broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9")
+ .addProperty("druid.query.default.context.maxConcurrentStages", "1");
+
+ historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9")
+ .addProperty("druid.msq.dart.worker.concurrentQueries", "1")
+ .addProperty("druid.lookup.enableLookupSyncOnStartup", "true");
+
+ indexer.setServerMemory(300_000_000) // to run 2x realtime and 2x MSQ tasks
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+ // druid.processing.numThreads must be higher than # of MSQ tasks
to avoid contention, because the realtime
+ // server is contacted in such a way that the processing thread is
blocked
+ .addProperty("druid.processing.numThreads", "3")
+ .addProperty("druid.worker.capacity", "4")
+ .addProperty("druid.lookup.enableLookupSyncOnStartup", "true");
+
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .addExtensions(
+ KafkaIndexTaskModule.class,
+ DartControllerModule.class,
+ DartWorkerModule.class,
+ DartControllerMemoryManagementModule.class,
+ DartWorkerMemoryManagementModule.class,
+ IndexerMemoryManagementModule.class,
+ MSQDurableStorageModule.class,
+ MSQIndexingModule.class,
+ MSQSqlModule.class,
+ SqlTaskModule.class
+ )
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+ .addCommonProperty("druid.msq.dart.enabled", "true")
+ .useLatchableEmitter()
+ .addResource(kafka)
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(router)
+ .addServer(broker)
+ .addServer(historical)
+ .addServer(indexer);
+ }
+
+ @BeforeEach
+ void setUpEach()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+ topic = dataSource = EmbeddedClusterApis.createTestDatasourceName();
+
+ // Create Kafka topic.
+ kafka.createTopicWithPartitions(topic, 2);
+
+ // Submit a supervisor.
+ final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor();
+ final Map<String, String> startSupervisorResult =
+ cluster.callApi().onLeaderOverlord(o ->
o.postSupervisor(kafkaSupervisorSpec));
+ Assertions.assertEquals(Map.of("id", dataSource), startSupervisorResult);
+
+ // Send data to Kafka.
+ final QueryableIndexCursorFactory cursorFactory =
+ new QueryableIndexCursorFactory(TestIndex.getMMappedTestIndex());
+ final RowSignature signature = cursorFactory.getRowSignature();
+ kafka.produceRecordsToTopic(
+ FrameTestUtil.readRowsFromCursorFactory(cursorFactory)
+ .map(row -> {
+ final Map<String, Object> rowMap = new
LinkedHashMap<>();
+ for (int i = 0; i < row.size(); i++) {
+ rowMap.put(signature.getColumnName(i), row.get(i));
+ }
+ try {
+ return new ProducerRecord<>(
+ topic,
+ ByteArrays.EMPTY_ARRAY,
+ TestHelper.JSON_MAPPER.writeValueAsBytes(rowMap)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .toList()
+ );
+
+ final int totalRows = TestIndex.getMMappedTestIndex().getNumRows();
+ // Wait for it to be loaded.
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/events/processed")
+ .hasDimension(DruidMetrics.DATASOURCE,
Collections.singletonList(dataSource)),
+ agg -> agg.hasSumAtLeast(totalRows)
+ );
+ }
+
+ @AfterEach
+ void tearDownEach() throws ExecutionException, InterruptedException,
IOException
+ {
+ final Map<String, String> terminateSupervisorResult =
+ cluster.callApi().onLeaderOverlord(o ->
o.terminateSupervisor(dataSource));
+ Assertions.assertEquals(Map.of("id", dataSource),
terminateSupervisorResult);
+
+ // Cancel all running tasks, so we don't need to wait for them to hand off
their segments.
+ try (final CloseableIterator<TaskStatusPlus> it =
cluster.leaderOverlord().taskStatuses(null, null, null).get()) {
+ while (it.hasNext()) {
+ cluster.leaderOverlord().cancelTask(it.next().getId());
+ }
+ }
+
+ kafka.deleteTopic(topic);
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_unnest_task_withRealtime()
+ {
+ final String sql = StringUtils.format(
+ "SET includeSegmentSource = 'REALTIME';\n"
+ + "SELECT d3 FROM \"%s\" CROSS JOIN
UNNEST(MV_TO_ARRAY(\"placementish\")) AS d3\n"
+ + "LIMIT 5",
+ dataSource
+ );
+ final MSQTaskReportPayload payload = msqApis.runTaskSql(sql);
+
+ // By default tasks do not include realtime data; count is zero.
Review Comment:
comment seems incorrect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]