This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 187e21afaea Add `BrokerClient` implementation (#17382)
187e21afaea is described below

commit 187e21afaea8495d94ed780254beef9b12536757
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Oct 21 11:05:53 2024 -0700

    Add `BrokerClient` implementation (#17382)
    
    This patch is extracted from PR 17353.
    
    Changes:
    
    - Added BrokerClient and BrokerClientImpl to the sql package that leverages 
the ServiceClient functionality; similar to OverlordClient and 
CoordinatorClient implementations in the server module.
    - For now, only two broker API stubs are added: submitSqlTask() and 
fetchExplainPlan().
    - Added a new POJO class ExplainPlan that encapsulates explain plan info.
    - Deprecated org.apache.druid.discovery.BrokerClient in favor of the new 
BrokerClient in this patch.
    - Clean up ExplainAttributesTest a bit and added serde verification.
---
 .../org/apache/druid/discovery/BrokerClient.java   |   4 +
 .../druid/rpc/guice/ServiceClientModule.java       |  15 ++-
 .../druid/rpc/guice/ServiceClientModuleTest.java   |  99 ++++++++++++++
 .../sql/calcite/planner/ExplainAttributes.java     |  26 ++++
 .../java/org/apache/druid/sql/client/Broker.java   |  34 +++++
 .../org/apache/druid/sql/client/BrokerClient.java  |  51 +++++++
 .../apache/druid/sql/client/BrokerClientImpl.java  |  84 ++++++++++++
 .../druid/sql/guice/BrokerServiceModule.java       |  91 +++++++++++++
 .../org/apache/druid/sql/http/ExplainPlan.java     | 117 ++++++++++++++++
 .../sql/calcite/planner/ExplainAttributesTest.java |  81 +++++++----
 .../druid/sql/client/BrokerClientImplTest.java     | 148 +++++++++++++++++++++
 .../druid/sql/guice/BrokerServiceModuleTest.java   | 114 ++++++++++++++++
 .../org/apache/druid/sql/http/ExplainPlanTest.java | 122 +++++++++++++++++
 13 files changed, 953 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java 
b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
index bdee9b8dfe4..a0ddbf42bed 100644
--- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.rpc.ServiceClient;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -41,7 +42,10 @@ import java.util.concurrent.ExecutionException;
 
 /**
  * This class facilitates interaction with Broker.
+ * Note that this should be removed and reconciled with 
org.apache.druid.sql.client.BrokerClient, which has the
+ * built-in functionality of {@link ServiceClient}, and proper Guice and 
service discovery wired in.
  */
+@Deprecated
 public class BrokerClient
 {
   private static final int MAX_RETRIES = 5;
diff --git 
a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java 
b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
index 51dd2b89d73..94dfeb29d95 100644
--- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
+++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
@@ -47,8 +47,8 @@ import java.util.concurrent.ScheduledExecutorService;
 
 public class ServiceClientModule implements DruidModule
 {
+  public static final int CLIENT_MAX_ATTEMPTS = 6;
   private static final int CONNECT_EXEC_THREADS = 4;
-  private static final int CLIENT_MAX_ATTEMPTS = 6;
 
   @Override
   public void configure(Binder binder)
@@ -59,11 +59,9 @@ public class ServiceClientModule implements DruidModule
   @Provides
   @LazySingleton
   @EscalatedGlobal
-  public ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final 
HttpClient httpClient)
+  public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final 
HttpClient httpClient)
   {
-    final ScheduledExecutorService connectExec =
-        ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, 
"ServiceClientFactory-%d");
-    return new ServiceClientFactoryImpl(httpClient, connectExec);
+    return makeServiceClientFactory(httpClient);
   }
 
   @Provides
@@ -117,4 +115,11 @@ public class ServiceClientModule implements DruidModule
         jsonMapper
     );
   }
+
+  public static ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal 
final HttpClient httpClient)
+  {
+    final ScheduledExecutorService connectExec =
+        ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, 
"ServiceClientFactory-%d");
+    return new ServiceClientFactoryImpl(httpClient, connectExec);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java 
b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
new file mode 100644
index 00000000000..5d8a07d20a8
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.rpc.guice;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static org.junit.Assert.assertNotNull;
+
+public class ServiceClientModuleTest
+{
+  private Injector injector;
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+  @Mock
+  private HttpClient httpClient;
+
+  @Mock
+  private DruidNodeDiscoveryProvider discoveryProvider;
+
+  @Mock
+  private ServiceLocator serviceLocator;
+
+  @Mock
+  private ServiceClientFactory serviceClientFactory;
+
+  @Before
+  public void setUp()
+  {
+    injector = Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            new LifecycleModule(),
+          new JacksonModule(),
+          new ServiceClientModule(),
+          binder -> {
+            
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+            binder.bind(ServiceLocator.class).toInstance(serviceLocator);
+            
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
+            
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
+          }
+          )
+    );
+  }
+
+  @Test
+  public void testGetServiceClientFactory()
+  {
+    assertNotNull(injector.getInstance(ServiceClientFactory.class));
+  }
+
+  @Test
+  public void testGetOverlordClient()
+  {
+    assertNotNull(injector.getInstance(OverlordClient.class));
+  }
+
+  @Test
+  public void testGetCoordinatorClient()
+  {
+    assertNotNull(injector.getInstance(CoordinatorClient.class));
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
index e2ae4fa7a10..533de7d58f2 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
@@ -19,12 +19,14 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.granularity.Granularity;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * ExplainAttributes holds the attributes of a SQL statement that is used in 
the EXPLAIN PLAN result.
@@ -45,6 +47,7 @@ public final class ExplainAttributes
   @Nullable
   private final String replaceTimeChunks;
 
+  @JsonCreator
   public ExplainAttributes(
       @JsonProperty("statementType") final String statementType,
       @JsonProperty("targetDataSource") @Nullable final String 
targetDataSource,
@@ -117,6 +120,29 @@ public final class ExplainAttributes
     return replaceTimeChunks;
   }
 
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ExplainAttributes that = (ExplainAttributes) o;
+    return Objects.equals(statementType, that.statementType)
+           && Objects.equals(targetDataSource, that.targetDataSource)
+           && Objects.equals(partitionedBy, that.partitionedBy)
+           && Objects.equals(clusteredBy, that.clusteredBy)
+           && Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(statementType, targetDataSource, partitionedBy, 
clusteredBy, replaceTimeChunks);
+  }
+
   @Override
   public String toString()
   {
diff --git a/sql/src/main/java/org/apache/druid/sql/client/Broker.java 
b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
new file mode 100644
index 00000000000..fb20c5166c8
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Broker
+{
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java 
b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
new file mode 100644
index 00000000000..14cbbb7bff6
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+
+import java.util.List;
+
+/**
+ * High-level Broker client.
+ * <p>
+ * All methods return futures, enabling asynchronous logic. If you want a 
synchronous response, use
+ * {@code FutureUtils.get} or {@code FutureUtils.getUnchecked}.
+ * Futures resolve to exceptions in the manner described by {@link 
org.apache.druid.rpc.ServiceClient#asyncRequest}.
+ * </p>
+ * Typically acquired via Guice, where it is registered using {@link 
org.apache.druid.rpc.guice.ServiceClientModule}.
+ */
+public interface BrokerClient
+{
+  /**
+   * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
+   */
+  ListenableFuture<SqlTaskStatus> submitSqlTask(SqlQuery sqlQuery);
+
+  /**
+   * Fetches the explain plan for the given {@code sqlQuery} from the Broker's 
SQL task endpoint.
+   *
+   * @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} 
information is to be fetched
+   */
+  ListenableFuture<List<ExplainPlan>> fetchExplainPlan(SqlQuery sqlQuery);
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java 
b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
new file mode 100644
index 00000000000..b3e064341e6
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.List;
+
+public class BrokerClientImpl implements BrokerClient
+{
+  private final ServiceClient client;
+  private final ObjectMapper jsonMapper;
+
+  public BrokerClientImpl(final ServiceClient client, final ObjectMapper 
jsonMapper)
+  {
+    this.client = client;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public ListenableFuture<SqlTaskStatus> submitSqlTask(final SqlQuery sqlQuery)
+  {
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+                .jsonContent(jsonMapper, sqlQuery),
+            new BytesFullResponseHandler()
+        ),
+        holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), 
SqlTaskStatus.class)
+    );
+  }
+
+  @Override
+  public ListenableFuture<List<ExplainPlan>> fetchExplainPlan(final SqlQuery 
sqlQuery)
+  {
+    final SqlQuery explainSqlQuery = new SqlQuery(
+        StringUtils.format("EXPLAIN PLAN FOR %s", sqlQuery.getQuery()),
+        null,
+        false,
+        false,
+        false,
+        null,
+        null
+    );
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+                .jsonContent(jsonMapper, explainSqlQuery),
+            new BytesFullResponseHandler()
+        ),
+        holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new 
TypeReference<List<ExplainPlan>>() {})
+    );
+  }
+}
+
diff --git 
a/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java 
b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
new file mode 100644
index 00000000000..05e022f8310
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.DiscoveryServiceLocator;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.guice.ServiceClientModule;
+import org.apache.druid.sql.client.Broker;
+import org.apache.druid.sql.client.BrokerClient;
+import org.apache.druid.sql.client.BrokerClientImpl;
+
+/**
+ * Module that processes can install if they require a {@link BrokerClient}.
+ * <p>
+ * Similar to {@link ServiceClientModule}, but since {@link BrokerClient} 
depends
+ * on classes from the sql module, this is a separate module within the sql 
package.
+ * </p>
+ */
+public class BrokerServiceModule implements DruidModule
+{
+  @Override
+  public void configure(Binder binder)
+  {
+    // Nothing to do.
+  }
+
+  @Provides
+  @LazySingleton
+  @EscalatedGlobal
+  public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final 
HttpClient httpClient)
+  {
+    return ServiceClientModule.makeServiceClientFactory(httpClient);
+  }
+
+  @Provides
+  @ManageLifecycle
+  @Broker
+  public ServiceLocator makeBrokerServiceLocator(final 
DruidNodeDiscoveryProvider discoveryProvider)
+  {
+    return new DiscoveryServiceLocator(discoveryProvider, NodeRole.BROKER);
+  }
+
+  @Provides
+  @LazySingleton
+  public BrokerClient makeBrokerClient(
+      @Json final ObjectMapper jsonMapper,
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      @Broker final ServiceLocator serviceLocator
+  )
+  {
+    return new BrokerClientImpl(
+        clientFactory.makeClient(
+            NodeRole.BROKER.getJsonName(),
+            serviceLocator,
+            
StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build()
+        ),
+        jsonMapper
+    );
+  }
+}
+
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java 
b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
new file mode 100644
index 00000000000..68defc4b2a4
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Class that encapsulates the information of a single plan for an {@code 
EXPLAIN PLAN FOR} query.
+ * <p>
+ * Similar to {@link #getAttributes()}, it's possible to provide more 
structure to {@link #getPlan()},
+ * at least for the native query explain, but there's currently no use case 
for it.
+ * </p>
+ */
+public class ExplainPlan
+{
+  @JsonProperty("PLAN")
+  private final String plan;
+
+  @JsonProperty("RESOURCES")
+  private final String resources;
+
+  @JsonProperty("ATTRIBUTES")
+  @JsonDeserialize(using = ExplainAttributesDeserializer.class)
+  private final ExplainAttributes attributes;
+
+  @JsonCreator
+  public ExplainPlan(
+      @JsonProperty("PLAN") final String plan,
+      @JsonProperty("RESOURCES") final String resources,
+      @JsonProperty("ATTRIBUTES") final ExplainAttributes attributes
+  )
+  {
+    this.plan = plan;
+    this.resources = resources;
+    this.attributes = attributes;
+  }
+
+  public String getPlan()
+  {
+    return plan;
+  }
+
+  public String getResources()
+  {
+    return resources;
+  }
+
+  public ExplainAttributes getAttributes()
+  {
+    return attributes;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ExplainPlan that = (ExplainPlan) o;
+    return Objects.equals(plan, that.plan)
+           && Objects.equals(resources, that.resources)
+           && Objects.equals(attributes, that.attributes);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(plan, resources, attributes);
+  }
+
+  /**
+   * Custom deserializer for {@link ExplainAttributes} because the value for 
{@link #attributes} in the plan
+   * is encoded as a JSON string. This deserializer tells Jackson on how to 
parse the JSON string
+   * and map it to the fields in the {@link ExplainAttributes} class.
+   */
+  private static class ExplainAttributesDeserializer extends 
JsonDeserializer<ExplainAttributes>
+  {
+    @Override
+    public ExplainAttributes deserialize(JsonParser jsonParser, 
DeserializationContext context) throws IOException
+    {
+      final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
+      return objectMapper.readValue(jsonParser.getText(), 
ExplainAttributes.class);
+    }
+  }
+}
+
+
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
index 53e2abf2749..d203dd34002 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
@@ -19,8 +19,8 @@
 
 package org.apache.druid.sql.calcite.planner;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.junit.Assert;
@@ -28,14 +28,16 @@ import org.junit.Test;
 
 import java.util.Arrays;
 
+import static org.junit.Assert.assertEquals;
+
 public class ExplainAttributesTest
 {
-  private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new 
DefaultObjectMapper();
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
 
   @Test
-  public void testSimpleGetters()
+  public void testGetters()
   {
-    ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, 
null, null, null);
+    final ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", 
null, null, null, null);
     Assert.assertEquals("SELECT", selectAttributes.getStatementType());
     Assert.assertNull(selectAttributes.getTargetDataSource());
     Assert.assertNull(selectAttributes.getPartitionedBy());
@@ -44,9 +46,9 @@ public class ExplainAttributesTest
   }
 
   @Test
-  public void testSerializeSelectAttributes() throws JsonProcessingException
+  public void testSerdeOfSelectAttributes()
   {
-    ExplainAttributes selectAttributes = new ExplainAttributes(
+    final ExplainAttributes selectAttributes = new ExplainAttributes(
         "SELECT",
         null,
         null,
@@ -56,13 +58,14 @@ public class ExplainAttributesTest
     final String expectedAttributes = "{"
                                       + "\"statementType\":\"SELECT\""
                                       + "}";
-    Assert.assertEquals(expectedAttributes, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(selectAttributes));
+
+    testSerde(selectAttributes, expectedAttributes);
   }
 
   @Test
-  public void testSerializeInsertAttributes() throws JsonProcessingException
+  public void testSerdeOfInsertAttributes()
   {
-    ExplainAttributes insertAttributes = new ExplainAttributes(
+    final ExplainAttributes insertAttributes = new ExplainAttributes(
         "INSERT",
         "foo",
         Granularities.DAY,
@@ -74,13 +77,13 @@ public class ExplainAttributesTest
                                       + "\"targetDataSource\":\"foo\","
                                       + "\"partitionedBy\":\"DAY\""
                                       + "}";
-    Assert.assertEquals(expectedAttributes, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+    testSerde(insertAttributes, expectedAttributes);
   }
 
   @Test
-  public void testSerializeInsertAllAttributes() throws JsonProcessingException
+  public void testSerdeOfInsertAllAttributes()
   {
-    ExplainAttributes insertAttributes = new ExplainAttributes(
+    final ExplainAttributes insertAttributes = new ExplainAttributes(
         "INSERT",
         "foo",
         Granularities.ALL,
@@ -92,78 +95,100 @@ public class ExplainAttributesTest
                                       + "\"targetDataSource\":\"foo\","
                                       + "\"partitionedBy\":{\"type\":\"all\"}"
                                       + "}";
-    Assert.assertEquals(expectedAttributes, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+    testSerde(insertAttributes, expectedAttributes);
   }
 
   @Test
-  public void testSerializeReplaceAttributes() throws JsonProcessingException
+  public void testSerdeOfReplaceAttributes()
   {
-    ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+    final ExplainAttributes replaceAttributes = new ExplainAttributes(
         "REPLACE",
         "foo",
         Granularities.HOUR,
         null,
         "ALL"
     );
-    final String expectedAttributes1 = "{"
+    final String expectedAttributes = "{"
                                        + "\"statementType\":\"REPLACE\","
                                        + "\"targetDataSource\":\"foo\","
                                        + "\"partitionedBy\":\"HOUR\","
                                        + "\"replaceTimeChunks\":\"ALL\""
                                        + "}";
-    Assert.assertEquals(expectedAttributes1, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
+    testSerde(replaceAttributes, expectedAttributes);
 
+  }
 
-    ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+  @Test
+  public void testSerdeOfReplaceAttributesWithTimeChunks()
+  {
+    final ExplainAttributes replaceAttributes = new ExplainAttributes(
         "REPLACE",
         "foo",
         Granularities.HOUR,
         null,
         "2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
     );
-    final String expectedAttributes2 = "{"
+    final String expectedAttributes = "{"
                                        + "\"statementType\":\"REPLACE\","
                                        + "\"targetDataSource\":\"foo\","
                                        + "\"partitionedBy\":\"HOUR\","
                                        + 
"\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
                                        + "}";
-    Assert.assertEquals(expectedAttributes2, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+    testSerde(replaceAttributes, expectedAttributes);
   }
 
   @Test
-  public void testSerializeReplaceWithClusteredByAttributes() throws 
JsonProcessingException
+  public void testReplaceAttributesWithClusteredBy()
   {
-    ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+    final ExplainAttributes replaceAttributes = new ExplainAttributes(
         "REPLACE",
         "foo",
         Granularities.HOUR,
         Arrays.asList("foo", "CEIL(`f2`)"),
         "ALL"
     );
-    final String expectedAttributes1 = "{"
+    final String expectedAttributes = "{"
                                        + "\"statementType\":\"REPLACE\","
                                        + "\"targetDataSource\":\"foo\","
                                        + "\"partitionedBy\":\"HOUR\","
                                        + 
"\"clusteredBy\":[\"foo\",\"CEIL(`f2`)\"],"
                                        + "\"replaceTimeChunks\":\"ALL\""
                                        + "}";
-    Assert.assertEquals(expectedAttributes1, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
-
+    testSerde(replaceAttributes, expectedAttributes);
+  }
 
-    ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+  @Test
+  public void testReplaceAttributesWithClusteredByAndTimeChunks()
+  {
+    final ExplainAttributes replaceAttributes = new ExplainAttributes(
         "REPLACE",
         "foo",
         Granularities.HOUR,
         Arrays.asList("foo", "boo"),
         "2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
     );
-    final String expectedAttributes2 = "{"
+    final String expectedAttributes = "{"
                                        + "\"statementType\":\"REPLACE\","
                                        + "\"targetDataSource\":\"foo\","
                                        + "\"partitionedBy\":\"HOUR\","
                                        + "\"clusteredBy\":[\"foo\",\"boo\"],"
                                        + 
"\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
                                        + "}";
-    Assert.assertEquals(expectedAttributes2, 
DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+    testSerde(replaceAttributes, expectedAttributes);
   }
+
+  private void testSerde(final ExplainAttributes explainAttributes, final 
String expectedSerializedAttributes)
+  {
+    final ExplainAttributes observedAttributes;
+    try {
+      final String observedSerializedAttributes = 
MAPPER.writeValueAsString(explainAttributes);
+      assertEquals(expectedSerializedAttributes, observedSerializedAttributes);
+      observedAttributes = MAPPER.readValue(observedSerializedAttributes, 
ExplainAttributes.class);
+    }
+    catch (Exception e) {
+      throw DruidException.defensive(e, "Error serializing/deserializing 
explain plan[%s].", explainAttributes);
+    }
+    assertEquals(explainAttributes, observedAttributes);
+  }
+
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java 
b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
new file mode 100644
index 00000000000..51d66f03816
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.SqlType;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.rpc.MockServiceClient;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+import org.apache.druid.sql.http.ExplainPlan;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.sql.http.SqlParameter;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class BrokerClientImplTest
+{
+  private ObjectMapper jsonMapper;
+  private MockServiceClient serviceClient;
+  private BrokerClient brokerClient;
+
+  @Before
+  public void setup()
+  {
+    jsonMapper = new DefaultObjectMapper();
+    serviceClient = new MockServiceClient();
+    brokerClient = new BrokerClientImpl(serviceClient, jsonMapper);
+  }
+
+  @After
+  public void tearDown()
+  {
+    serviceClient.verify();
+  }
+
+  @Test
+  public void testSubmitSqlTask() throws Exception
+  {
+    final SqlQuery query = new SqlQuery(
+        "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+        ResultFormat.ARRAY,
+        true,
+        true,
+        true,
+        ImmutableMap.of("useCache", false),
+        ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+    );
+    final SqlTaskStatus taskStatus = new SqlTaskStatus("taskId1", 
TaskState.RUNNING, null);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+            .jsonContent(jsonMapper, query),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(taskStatus)
+    );
+
+    assertEquals(taskStatus, brokerClient.submitSqlTask(query).get());
+  }
+
+  @Test
+  public void testFetchExplainPlan() throws Exception
+  {
+    final SqlQuery query = new SqlQuery(
+        "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+        ResultFormat.ARRAY,
+        true,
+        true,
+        true,
+        ImmutableMap.of("useCache", false),
+        ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+    );
+    final SqlQuery explainQuery = new SqlQuery(
+        StringUtils.format("EXPLAIN PLAN FOR %s", query.getQuery()),
+        null,
+        false,
+        false,
+        false,
+        null,
+        null
+    );
+
+    final String plan = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
 [...]
+    final String resources = 
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
+    final ExplainAttributes attributes = new ExplainAttributes("REPLACE", 
"foo", Granularities.ALL, null, "all");
+
+    final List<Map<String, Object>> givenPlans = ImmutableList.of(
+        ImmutableMap.of(
+            "PLAN",
+            plan,
+            "RESOURCES",
+            resources,
+            "ATTRIBUTES",
+            jsonMapper.writeValueAsString(attributes)
+        )
+    );
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+            .jsonContent(jsonMapper, explainQuery),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(givenPlans)
+    );
+
+    assertEquals(
+        ImmutableList.of(new ExplainPlan(plan, resources, attributes)),
+        brokerClient.fetchExplainPlan(query).get()
+    );
+  }
+
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java 
b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java
new file mode 100644
index 00000000000..9b59ad86733
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.guice;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.ConfigurationException;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.sql.client.BrokerClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+public class BrokerServiceModuleTest
+{
+  private Injector injector;
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+  @Mock
+  private HttpClient httpClient;
+
+  @Mock
+  private DruidNodeDiscoveryProvider discoveryProvider;
+
+  @Mock
+  private ServiceLocator serviceLocator;
+
+  @Mock
+  private ServiceClientFactory serviceClientFactory;
+
+  @Before
+  public void setUp()
+  {
+    injector = Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            new LifecycleModule(),
+            new JacksonModule(),
+            new BrokerServiceModule(),
+            binder -> {
+              
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+              binder.bind(ServiceLocator.class).toInstance(serviceLocator);
+              
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
+              
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
+            }
+        )
+    );
+  }
+
+  @Test
+  public void testGetServiceClientFactory()
+  {
+    assertNotNull(injector.getInstance(ServiceClientFactory.class));
+  }
+
+  @Test
+  public void testGetBrokerClient()
+  {
+    assertNotNull(injector.getInstance(BrokerClient.class));
+  }
+
+  @Test
+  public void testGetCoordinatorClient()
+  {
+    assertThrows(
+        ConfigurationException.class,
+        () -> injector.getInstance(CoordinatorClient.class)
+    );
+  }
+
+  @Test
+  public void testGetOverlordClient()
+  {
+    assertThrows(
+        ConfigurationException.class,
+        () -> injector.getInstance(OverlordClient.class)
+    );
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
new file mode 100644
index 00000000000..e3385fc5f51
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExplainPlanTest
+{
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
+  @Test
+  public void testExplainPlanSerdeForSelectQuery() throws 
JsonProcessingException
+  {
+    final String plan = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"EXPR$0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQuer
 [...]
+    final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
+    final ExplainAttributes attributes = new ExplainAttributes("SELECT", null, 
null, null, null);
+
+    final List<Map<String, Object>> givenPlans = ImmutableList.of(
+        ImmutableMap.of(
+            "PLAN",
+            plan,
+            "RESOURCES",
+            resources,
+            "ATTRIBUTES",
+            MAPPER.writeValueAsString(attributes)
+        )
+    );
+
+    testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, 
attributes)));
+  }
+
+  @Test
+  public void testExplainPlanSerdeForReplaceQuery() throws 
JsonProcessingException
+  {
+    final String plan = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
 [...]
+    final String resources = 
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
+    final ExplainAttributes attributes = new ExplainAttributes("REPLACE", 
"dst", Granularities.ALL, null, "all");
+
+    final List<Map<String, Object>> givenPlans = ImmutableList.of(
+        ImmutableMap.of(
+            "PLAN",
+            plan,
+            "RESOURCES",
+            resources,
+            "ATTRIBUTES",
+            MAPPER.writeValueAsString(attributes)
+        )
+    );
+
+    testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, 
attributes)));
+  }
+
+  @Test
+  public void testExplainPlanSerdeForInsertQuery() throws 
JsonProcessingException
+  {
+    final String plan = 
"[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultForm
 [...]
+    final String resources = 
"[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
+    final ExplainAttributes attributes = new ExplainAttributes("INSERT", 
"foo", Granularities.DAY, ImmutableList.of("floor_m1", "dim1", "CEIL(\"m2\")"), 
null);
+
+    final List<Map<String, Object>> givenPlans = ImmutableList.of(
+        ImmutableMap.of(
+            "PLAN",
+            plan,
+            "RESOURCES",
+            resources,
+            "ATTRIBUTES",
+            MAPPER.writeValueAsString(attributes)
+        )
+    );
+
+    testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, 
attributes)));
+  }
+
+  private void testSerde(
+      final List<Map<String, Object>> givenPlans,
+      final List<ExplainPlan> expectedExplainPlans
+  )
+  {
+    final List<ExplainPlan> observedExplainPlans;
+    try {
+      observedExplainPlans = MAPPER.readValue(
+          MAPPER.writeValueAsString(givenPlans),
+          new TypeReference<List<ExplainPlan>>() {}
+      );
+    }
+    catch (Exception e) {
+      throw DruidException.defensive(e, "Error deserializing given plans[%s] 
into explain plans.", givenPlans);
+    }
+    assertEquals(expectedExplainPlans, observedExplainPlans);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to