This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 187e21afaea Add `BrokerClient` implementation (#17382)
187e21afaea is described below
commit 187e21afaea8495d94ed780254beef9b12536757
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Oct 21 11:05:53 2024 -0700
Add `BrokerClient` implementation (#17382)
This patch is extracted from PR 17353.
Changes:
- Added BrokerClient and BrokerClientImpl to the sql package that leverages
the ServiceClient functionality; similar to OverlordClient and
CoordinatorClient implementations in the server module.
- For now, only two broker API stubs are added: submitSqlTask() and
fetchExplainPlan().
- Added a new POJO class ExplainPlan that encapsulates explain plan info.
- Deprecated org.apache.druid.discovery.BrokerClient in favor of the new
BrokerClient in this patch.
- Clean up ExplainAttributesTest a bit and added serde verification.
---
.../org/apache/druid/discovery/BrokerClient.java | 4 +
.../druid/rpc/guice/ServiceClientModule.java | 15 ++-
.../druid/rpc/guice/ServiceClientModuleTest.java | 99 ++++++++++++++
.../sql/calcite/planner/ExplainAttributes.java | 26 ++++
.../java/org/apache/druid/sql/client/Broker.java | 34 +++++
.../org/apache/druid/sql/client/BrokerClient.java | 51 +++++++
.../apache/druid/sql/client/BrokerClientImpl.java | 84 ++++++++++++
.../druid/sql/guice/BrokerServiceModule.java | 91 +++++++++++++
.../org/apache/druid/sql/http/ExplainPlan.java | 117 ++++++++++++++++
.../sql/calcite/planner/ExplainAttributesTest.java | 81 +++++++----
.../druid/sql/client/BrokerClientImplTest.java | 148 +++++++++++++++++++++
.../druid/sql/guice/BrokerServiceModuleTest.java | 114 ++++++++++++++++
.../org/apache/druid/sql/http/ExplainPlanTest.java | 122 +++++++++++++++++
13 files changed, 953 insertions(+), 33 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
index bdee9b8dfe4..a0ddbf42bed 100644
--- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.rpc.ServiceClient;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -41,7 +42,10 @@ import java.util.concurrent.ExecutionException;
/**
* This class facilitates interaction with Broker.
+ * Note that this should be removed and reconciled with
org.apache.druid.sql.client.BrokerClient, which has the
+ * built-in functionality of {@link ServiceClient}, and proper Guice and
service discovery wired in.
*/
+@Deprecated
public class BrokerClient
{
private static final int MAX_RETRIES = 5;
diff --git
a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
index 51dd2b89d73..94dfeb29d95 100644
--- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
+++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
@@ -47,8 +47,8 @@ import java.util.concurrent.ScheduledExecutorService;
public class ServiceClientModule implements DruidModule
{
+ public static final int CLIENT_MAX_ATTEMPTS = 6;
private static final int CONNECT_EXEC_THREADS = 4;
- private static final int CLIENT_MAX_ATTEMPTS = 6;
@Override
public void configure(Binder binder)
@@ -59,11 +59,9 @@ public class ServiceClientModule implements DruidModule
@Provides
@LazySingleton
@EscalatedGlobal
- public ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final
HttpClient httpClient)
+ public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final
HttpClient httpClient)
{
- final ScheduledExecutorService connectExec =
- ScheduledExecutors.fixed(CONNECT_EXEC_THREADS,
"ServiceClientFactory-%d");
- return new ServiceClientFactoryImpl(httpClient, connectExec);
+ return makeServiceClientFactory(httpClient);
}
@Provides
@@ -117,4 +115,11 @@ public class ServiceClientModule implements DruidModule
jsonMapper
);
}
+
+ public static ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal
final HttpClient httpClient)
+ {
+ final ScheduledExecutorService connectExec =
+ ScheduledExecutors.fixed(CONNECT_EXEC_THREADS,
"ServiceClientFactory-%d");
+ return new ServiceClientFactoryImpl(httpClient, connectExec);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
new file mode 100644
index 00000000000..5d8a07d20a8
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.rpc.guice;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static org.junit.Assert.assertNotNull;
+
+public class ServiceClientModuleTest
+{
+ private Injector injector;
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Mock
+ private DruidNodeDiscoveryProvider discoveryProvider;
+
+ @Mock
+ private ServiceLocator serviceLocator;
+
+ @Mock
+ private ServiceClientFactory serviceClientFactory;
+
+ @Before
+ public void setUp()
+ {
+ injector = Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new LifecycleModule(),
+ new JacksonModule(),
+ new ServiceClientModule(),
+ binder -> {
+
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+ binder.bind(ServiceLocator.class).toInstance(serviceLocator);
+
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
+
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
+ }
+ )
+ );
+ }
+
+ @Test
+ public void testGetServiceClientFactory()
+ {
+ assertNotNull(injector.getInstance(ServiceClientFactory.class));
+ }
+
+ @Test
+ public void testGetOverlordClient()
+ {
+ assertNotNull(injector.getInstance(OverlordClient.class));
+ }
+
+ @Test
+ public void testGetCoordinatorClient()
+ {
+ assertNotNull(injector.getInstance(CoordinatorClient.class));
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
index e2ae4fa7a10..533de7d58f2 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
@@ -19,12 +19,14 @@
package org.apache.druid.sql.calcite.planner;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Objects;
/**
* ExplainAttributes holds the attributes of a SQL statement that is used in
the EXPLAIN PLAN result.
@@ -45,6 +47,7 @@ public final class ExplainAttributes
@Nullable
private final String replaceTimeChunks;
+ @JsonCreator
public ExplainAttributes(
@JsonProperty("statementType") final String statementType,
@JsonProperty("targetDataSource") @Nullable final String
targetDataSource,
@@ -117,6 +120,29 @@ public final class ExplainAttributes
return replaceTimeChunks;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExplainAttributes that = (ExplainAttributes) o;
+ return Objects.equals(statementType, that.statementType)
+ && Objects.equals(targetDataSource, that.targetDataSource)
+ && Objects.equals(partitionedBy, that.partitionedBy)
+ && Objects.equals(clusteredBy, that.clusteredBy)
+ && Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(statementType, targetDataSource, partitionedBy,
clusteredBy, replaceTimeChunks);
+ }
+
@Override
public String toString()
{
diff --git a/sql/src/main/java/org/apache/druid/sql/client/Broker.java
b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
new file mode 100644
index 00000000000..fb20c5166c8
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Broker
+{
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
new file mode 100644
index 00000000000..14cbbb7bff6
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+
+import java.util.List;
+
+/**
+ * High-level Broker client.
+ * <p>
+ * All methods return futures, enabling asynchronous logic. If you want a
synchronous response, use
+ * {@code FutureUtils.get} or {@code FutureUtils.getUnchecked}.
+ * Futures resolve to exceptions in the manner described by {@link
org.apache.druid.rpc.ServiceClient#asyncRequest}.
+ * </p>
+ * Typically acquired via Guice, where it is registered using {@link
org.apache.druid.rpc.guice.ServiceClientModule}.
+ */
+public interface BrokerClient
+{
+ /**
+ * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
+ */
+ ListenableFuture<SqlTaskStatus> submitSqlTask(SqlQuery sqlQuery);
+
+ /**
+ * Fetches the explain plan for the given {@code sqlQuery} from the Broker's
SQL task endpoint.
+ *
+ * @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR}
information is to be fetched
+ */
+ ListenableFuture<List<ExplainPlan>> fetchExplainPlan(SqlQuery sqlQuery);
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
new file mode 100644
index 00000000000..b3e064341e6
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.List;
+
+public class BrokerClientImpl implements BrokerClient
+{
+ private final ServiceClient client;
+ private final ObjectMapper jsonMapper;
+
+ public BrokerClientImpl(final ServiceClient client, final ObjectMapper
jsonMapper)
+ {
+ this.client = client;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public ListenableFuture<SqlTaskStatus> submitSqlTask(final SqlQuery sqlQuery)
+ {
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, sqlQuery),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(),
SqlTaskStatus.class)
+ );
+ }
+
+ @Override
+ public ListenableFuture<List<ExplainPlan>> fetchExplainPlan(final SqlQuery
sqlQuery)
+ {
+ final SqlQuery explainSqlQuery = new SqlQuery(
+ StringUtils.format("EXPLAIN PLAN FOR %s", sqlQuery.getQuery()),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ );
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, explainSqlQuery),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new
TypeReference<List<ExplainPlan>>() {})
+ );
+ }
+}
+
diff --git
a/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
new file mode 100644
index 00000000000..05e022f8310
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.DiscoveryServiceLocator;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.guice.ServiceClientModule;
+import org.apache.druid.sql.client.Broker;
+import org.apache.druid.sql.client.BrokerClient;
+import org.apache.druid.sql.client.BrokerClientImpl;
+
+/**
+ * Module that processes can install if they require a {@link BrokerClient}.
+ * <p>
+ * Similar to {@link ServiceClientModule}, but since {@link BrokerClient}
depends
+ * on classes from the sql module, this is a separate module within the sql
package.
+ * </p>
+ */
+public class BrokerServiceModule implements DruidModule
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ // Nothing to do.
+ }
+
+ @Provides
+ @LazySingleton
+ @EscalatedGlobal
+ public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final
HttpClient httpClient)
+ {
+ return ServiceClientModule.makeServiceClientFactory(httpClient);
+ }
+
+ @Provides
+ @ManageLifecycle
+ @Broker
+ public ServiceLocator makeBrokerServiceLocator(final
DruidNodeDiscoveryProvider discoveryProvider)
+ {
+ return new DiscoveryServiceLocator(discoveryProvider, NodeRole.BROKER);
+ }
+
+ @Provides
+ @LazySingleton
+ public BrokerClient makeBrokerClient(
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @Broker final ServiceLocator serviceLocator
+ )
+ {
+ return new BrokerClientImpl(
+ clientFactory.makeClient(
+ NodeRole.BROKER.getJsonName(),
+ serviceLocator,
+
StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build()
+ ),
+ jsonMapper
+ );
+ }
+}
+
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
new file mode 100644
index 00000000000..68defc4b2a4
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Class that encapsulates the information of a single plan for an {@code
EXPLAIN PLAN FOR} query.
+ * <p>
+ * Similar to {@link #getAttributes()}, it's possible to provide more
structure to {@link #getPlan()},
+ * at least for the native query explain, but there's currently no use case
for it.
+ * </p>
+ */
+public class ExplainPlan
+{
+ @JsonProperty("PLAN")
+ private final String plan;
+
+ @JsonProperty("RESOURCES")
+ private final String resources;
+
+ @JsonProperty("ATTRIBUTES")
+ @JsonDeserialize(using = ExplainAttributesDeserializer.class)
+ private final ExplainAttributes attributes;
+
+ @JsonCreator
+ public ExplainPlan(
+ @JsonProperty("PLAN") final String plan,
+ @JsonProperty("RESOURCES") final String resources,
+ @JsonProperty("ATTRIBUTES") final ExplainAttributes attributes
+ )
+ {
+ this.plan = plan;
+ this.resources = resources;
+ this.attributes = attributes;
+ }
+
+ public String getPlan()
+ {
+ return plan;
+ }
+
+ public String getResources()
+ {
+ return resources;
+ }
+
+ public ExplainAttributes getAttributes()
+ {
+ return attributes;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExplainPlan that = (ExplainPlan) o;
+ return Objects.equals(plan, that.plan)
+ && Objects.equals(resources, that.resources)
+ && Objects.equals(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(plan, resources, attributes);
+ }
+
+ /**
+ * Custom deserializer for {@link ExplainAttributes} because the value for
{@link #attributes} in the plan
+ * is encoded as a JSON string. This deserializer tells Jackson on how to
parse the JSON string
+ * and map it to the fields in the {@link ExplainAttributes} class.
+ */
+ private static class ExplainAttributesDeserializer extends
JsonDeserializer<ExplainAttributes>
+ {
+ @Override
+ public ExplainAttributes deserialize(JsonParser jsonParser,
DeserializationContext context) throws IOException
+ {
+ final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
+ return objectMapper.readValue(jsonParser.getText(),
ExplainAttributes.class);
+ }
+ }
+}
+
+
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
index 53e2abf2749..d203dd34002 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
@@ -19,8 +19,8 @@
package org.apache.druid.sql.calcite.planner;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.junit.Assert;
@@ -28,14 +28,16 @@ import org.junit.Test;
import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+
public class ExplainAttributesTest
{
- private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new
DefaultObjectMapper();
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
- public void testSimpleGetters()
+ public void testGetters()
{
- ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null,
null, null, null);
+ final ExplainAttributes selectAttributes = new ExplainAttributes("SELECT",
null, null, null, null);
Assert.assertEquals("SELECT", selectAttributes.getStatementType());
Assert.assertNull(selectAttributes.getTargetDataSource());
Assert.assertNull(selectAttributes.getPartitionedBy());
@@ -44,9 +46,9 @@ public class ExplainAttributesTest
}
@Test
- public void testSerializeSelectAttributes() throws JsonProcessingException
+ public void testSerdeOfSelectAttributes()
{
- ExplainAttributes selectAttributes = new ExplainAttributes(
+ final ExplainAttributes selectAttributes = new ExplainAttributes(
"SELECT",
null,
null,
@@ -56,13 +58,14 @@ public class ExplainAttributesTest
final String expectedAttributes = "{"
+ "\"statementType\":\"SELECT\""
+ "}";
- Assert.assertEquals(expectedAttributes,
DEFAULT_OBJECT_MAPPER.writeValueAsString(selectAttributes));
+
+ testSerde(selectAttributes, expectedAttributes);
}
@Test
- public void testSerializeInsertAttributes() throws JsonProcessingException
+ public void testSerdeOfInsertAttributes()
{
- ExplainAttributes insertAttributes = new ExplainAttributes(
+ final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.DAY,
@@ -74,13 +77,13 @@ public class ExplainAttributesTest
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"DAY\""
+ "}";
- Assert.assertEquals(expectedAttributes,
DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+ testSerde(insertAttributes, expectedAttributes);
}
@Test
- public void testSerializeInsertAllAttributes() throws JsonProcessingException
+ public void testSerdeOfInsertAllAttributes()
{
- ExplainAttributes insertAttributes = new ExplainAttributes(
+ final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.ALL,
@@ -92,78 +95,100 @@ public class ExplainAttributesTest
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":{\"type\":\"all\"}"
+ "}";
- Assert.assertEquals(expectedAttributes,
DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+ testSerde(insertAttributes, expectedAttributes);
}
@Test
- public void testSerializeReplaceAttributes() throws JsonProcessingException
+ public void testSerdeOfReplaceAttributes()
{
- ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"ALL"
);
- final String expectedAttributes1 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
- Assert.assertEquals(expectedAttributes1,
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
+ testSerde(replaceAttributes, expectedAttributes);
+ }
- ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+ @Test
+ public void testSerdeOfReplaceAttributesWithTimeChunks()
+ {
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
- final String expectedAttributes2 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+
"\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
- Assert.assertEquals(expectedAttributes2,
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+ testSerde(replaceAttributes, expectedAttributes);
}
@Test
- public void testSerializeReplaceWithClusteredByAttributes() throws
JsonProcessingException
+ public void testReplaceAttributesWithClusteredBy()
{
- ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "CEIL(`f2`)"),
"ALL"
);
- final String expectedAttributes1 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+
"\"clusteredBy\":[\"foo\",\"CEIL(`f2`)\"],"
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
- Assert.assertEquals(expectedAttributes1,
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
-
+ testSerde(replaceAttributes, expectedAttributes);
+ }
- ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+ @Test
+ public void testReplaceAttributesWithClusteredByAndTimeChunks()
+ {
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "boo"),
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
- final String expectedAttributes2 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"clusteredBy\":[\"foo\",\"boo\"],"
+
"\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
- Assert.assertEquals(expectedAttributes2,
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+ testSerde(replaceAttributes, expectedAttributes);
}
+
+ private void testSerde(final ExplainAttributes explainAttributes, final
String expectedSerializedAttributes)
+ {
+ final ExplainAttributes observedAttributes;
+ try {
+ final String observedSerializedAttributes =
MAPPER.writeValueAsString(explainAttributes);
+ assertEquals(expectedSerializedAttributes, observedSerializedAttributes);
+ observedAttributes = MAPPER.readValue(observedSerializedAttributes,
ExplainAttributes.class);
+ }
+ catch (Exception e) {
+ throw DruidException.defensive(e, "Error serializing/deserializing
explain plan[%s].", explainAttributes);
+ }
+ assertEquals(explainAttributes, observedAttributes);
+ }
+
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
new file mode 100644
index 00000000000..51d66f03816
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.SqlType;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.rpc.MockServiceClient;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.sql.http.SqlParameter;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class BrokerClientImplTest
+{
+ private ObjectMapper jsonMapper;
+ private MockServiceClient serviceClient;
+ private BrokerClient brokerClient;
+
+ @Before
+ public void setup()
+ {
+ jsonMapper = new DefaultObjectMapper();
+ serviceClient = new MockServiceClient();
+ brokerClient = new BrokerClientImpl(serviceClient, jsonMapper);
+ }
+
+ @After
+ public void tearDown()
+ {
+ serviceClient.verify();
+ }
+
+ @Test
+ public void testSubmitSqlTask() throws Exception
+ {
+ final SqlQuery query = new SqlQuery(
+ "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+ ResultFormat.ARRAY,
+ true,
+ true,
+ true,
+ ImmutableMap.of("useCache", false),
+ ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+ );
+ final SqlTaskStatus taskStatus = new SqlTaskStatus("taskId1",
TaskState.RUNNING, null);
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, query),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(taskStatus)
+ );
+
+ assertEquals(taskStatus, brokerClient.submitSqlTask(query).get());
+ }
+
+ @Test
+ public void testFetchExplainPlan() throws Exception
+ {
+ final SqlQuery query = new SqlQuery(
+ "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+ ResultFormat.ARRAY,
+ true,
+ true,
+ true,
+ ImmutableMap.of("useCache", false),
+ ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+ );
+ final SqlQuery explainQuery = new SqlQuery(
+ StringUtils.format("EXPLAIN PLAN FOR %s", query.getQuery()),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ );
+
+ final String plan =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
[...]
+ final String resources =
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
+ final ExplainAttributes attributes = new ExplainAttributes("REPLACE",
"foo", Granularities.ALL, null, "all");
+
+ final List<Map<String, Object>> givenPlans = ImmutableList.of(
+ ImmutableMap.of(
+ "PLAN",
+ plan,
+ "RESOURCES",
+ resources,
+ "ATTRIBUTES",
+ jsonMapper.writeValueAsString(attributes)
+ )
+ );
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, explainQuery),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(givenPlans)
+ );
+
+ assertEquals(
+ ImmutableList.of(new ExplainPlan(plan, resources, attributes)),
+ brokerClient.fetchExplainPlan(query).get()
+ );
+ }
+
+}
diff --git
a/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java
new file mode 100644
index 00000000000..9b59ad86733
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.guice;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.ConfigurationException;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.sql.client.BrokerClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+public class BrokerServiceModuleTest
+{
+ private Injector injector;
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Mock
+ private DruidNodeDiscoveryProvider discoveryProvider;
+
+ @Mock
+ private ServiceLocator serviceLocator;
+
+ @Mock
+ private ServiceClientFactory serviceClientFactory;
+
+ @Before
+ public void setUp()
+ {
+ injector = Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new LifecycleModule(),
+ new JacksonModule(),
+ new BrokerServiceModule(),
+ binder -> {
+
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+ binder.bind(ServiceLocator.class).toInstance(serviceLocator);
+
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
+
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
+ }
+ )
+ );
+ }
+
+ @Test
+ public void testGetServiceClientFactory()
+ {
+ assertNotNull(injector.getInstance(ServiceClientFactory.class));
+ }
+
+ @Test
+ public void testGetBrokerClient()
+ {
+ assertNotNull(injector.getInstance(BrokerClient.class));
+ }
+
+ @Test
+ public void testGetCoordinatorClient()
+ {
+ assertThrows(
+ ConfigurationException.class,
+ () -> injector.getInstance(CoordinatorClient.class)
+ );
+ }
+
+ @Test
+ public void testGetOverlordClient()
+ {
+ assertThrows(
+ ConfigurationException.class,
+ () -> injector.getInstance(OverlordClient.class)
+ );
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
new file mode 100644
index 00000000000..e3385fc5f51
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExplainPlanTest
+{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
+ @Test
+ public void testExplainPlanSerdeForSelectQuery() throws
JsonProcessingException
+ {
+ final String plan =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"EXPR$0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQuer
[...]
+ final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
+ final ExplainAttributes attributes = new ExplainAttributes("SELECT", null,
null, null, null);
+
+ final List<Map<String, Object>> givenPlans = ImmutableList.of(
+ ImmutableMap.of(
+ "PLAN",
+ plan,
+ "RESOURCES",
+ resources,
+ "ATTRIBUTES",
+ MAPPER.writeValueAsString(attributes)
+ )
+ );
+
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources,
attributes)));
+ }
+
+ @Test
+ public void testExplainPlanSerdeForReplaceQuery() throws
JsonProcessingException
+ {
+ final String plan =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
[...]
+ final String resources =
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
+ final ExplainAttributes attributes = new ExplainAttributes("REPLACE",
"dst", Granularities.ALL, null, "all");
+
+ final List<Map<String, Object>> givenPlans = ImmutableList.of(
+ ImmutableMap.of(
+ "PLAN",
+ plan,
+ "RESOURCES",
+ resources,
+ "ATTRIBUTES",
+ MAPPER.writeValueAsString(attributes)
+ )
+ );
+
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources,
attributes)));
+ }
+
+ @Test
+ public void testExplainPlanSerdeForInsertQuery() throws
JsonProcessingException
+ {
+ final String plan =
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
[...]
+ final String resources =
"[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
+ final ExplainAttributes attributes = new ExplainAttributes("INSERT",
"foo", Granularities.DAY, ImmutableList.of("floor_m1", "dim1", "CEIL(\"m2\")"),
null);
+
+ final List<Map<String, Object>> givenPlans = ImmutableList.of(
+ ImmutableMap.of(
+ "PLAN",
+ plan,
+ "RESOURCES",
+ resources,
+ "ATTRIBUTES",
+ MAPPER.writeValueAsString(attributes)
+ )
+ );
+
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources,
attributes)));
+ }
+
+ private void testSerde(
+ final List<Map<String, Object>> givenPlans,
+ final List<ExplainPlan> expectedExplainPlans
+ )
+ {
+ final List<ExplainPlan> observedExplainPlans;
+ try {
+ observedExplainPlans = MAPPER.readValue(
+ MAPPER.writeValueAsString(givenPlans),
+ new TypeReference<List<ExplainPlan>>() {}
+ );
+ }
+ catch (Exception e) {
+ throw DruidException.defensive(e, "Error deserializing given plans[%s]
into explain plans.", givenPlans);
+ }
+ assertEquals(expectedExplainPlans, observedExplainPlans);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]