This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6bb3b88d05f89fb7a1a54f302b4d329cbf4f69ec
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Aug 4 17:03:46 2020 -0700

    IMPALA-9180 (part 1): Remove legacy ImpalaInternalService
    
    The legacy Thrift based Impala internal service has been deprecated
    and can be removed now.
    
    This patch removes ImpalaInternalService. All infrastructures around it
    are cleaned up, except one place for flag be_port.
    StatestoreSubscriber::subscriber_id consists be_port, but we cannot
    change format of subscriber_id now. This remaining be_port issue will be
    fixed in a succeeding patch (part 4).
    TQueryCtx.coord_address is changed to TQueryCtx.coord_hostname since the
    port in TQueryCtx.coord_address is set as be_port and is unused now.
    Also Rename TQueryCtx.coord_krpc_address as TQueryCtx.coord_ip_address.
    
    Testing:
     - Passed the exhaustive test.
     - Passed Quasar-L0 test.
    
    Change-Id: I5fa83c8009590124dded4783f77ef70fa30119e6
    Reviewed-on: http://gerrit.cloudera.org:8080/16291
    Reviewed-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/generated-sources/gen-cpp/CMakeLists.txt   |  1 -
 be/src/benchmarks/expr-benchmark.cc           |  7 ++-
 be/src/exprs/expr-test.cc                     |  4 +-
 be/src/exprs/utility-functions-ir.cc          |  4 +-
 be/src/rpc/impala-service-pool.cc             | 16 +++++--
 be/src/rpc/impala-service-pool.h              | 10 ++++
 be/src/rpc/rpc-mgr.cc                         |  7 +++
 be/src/rpc/rpc-mgr.h                          |  3 ++
 be/src/rpc/thrift-server-test.cc              | 26 -----------
 be/src/runtime/backend-client.h               | 46 -------------------
 be/src/runtime/client-cache-types.h           |  8 ----
 be/src/runtime/coordinator-backend-state.cc   |  1 -
 be/src/runtime/data-stream-test.cc            |  1 -
 be/src/runtime/exec-env.cc                    | 30 +++++-------
 be/src/runtime/exec-env.h                     | 19 ++++----
 be/src/runtime/fragment-instance-state.cc     |  1 -
 be/src/runtime/fragment-instance-state.h      |  1 -
 be/src/runtime/initial-reservations.cc        |  4 +-
 be/src/runtime/query-exec-mgr.cc              |  3 +-
 be/src/runtime/query-state.cc                 |  8 ++--
 be/src/runtime/runtime-filter-bank.cc         | 12 ++---
 be/src/runtime/test-env.cc                    |  4 +-
 be/src/scheduling/scheduler-test-util.h       |  1 -
 be/src/service/CMakeLists.txt                 |  1 -
 be/src/service/client-request-state.cc        |  3 +-
 be/src/service/control-service.cc             |  3 +-
 be/src/service/impala-internal-service.cc     | 46 -------------------
 be/src/service/impala-internal-service.h      | 40 ----------------
 be/src/service/impala-server.cc               | 66 ++++++---------------------
 be/src/service/impala-server.h                | 17 ++-----
 be/src/service/impalad-main.cc                |  6 +--
 be/src/service/session-expiry-test.cc         |  1 -
 be/src/testutil/in-process-servers.cc         | 32 +++++--------
 be/src/testutil/in-process-servers.h          |  7 ++-
 be/src/util/debug-util.cc                     |  4 +-
 bin/generate_minidump_collection_testdata.py  |  1 -
 common/thrift/ImpalaInternalService.thrift    | 10 ++--
 tests/custom_cluster/test_blacklist.py        |  8 ++--
 tests/custom_cluster/test_process_failures.py |  2 +-
 tests/custom_cluster/test_query_retries.py    |  2 +-
 tests/custom_cluster/test_restart_services.py | 11 ++---
 tests/webserver/test_web_pages.py             |  4 +-
 42 files changed, 130 insertions(+), 351 deletions(-)

diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt 
b/be/generated-sources/gen-cpp/CMakeLists.txt
index 56093f4..271dcb7 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -30,7 +30,6 @@ set(SRC_FILES
   CatalogService_types.cpp
   CatalogInternalService_constants.cpp
   CatalogInternalService_types.cpp
-  ImpalaInternalService.cpp
   ImpalaInternalService_constants.cpp
   ImpalaInternalService_types.cpp
   ImpalaService.cpp
diff --git a/be/src/benchmarks/expr-benchmark.cc 
b/be/src/benchmarks/expr-benchmark.cc
index be42114..689295f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -40,10 +40,8 @@
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-server.h"
 #include "codegen/llvm-codegen.h"
@@ -81,8 +79,9 @@ class Planner {
     query_ctx.client_request.stmt = stmt;
     query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
-    TNetworkAddress dummy;
-    ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
+    string dummy_hostname = "";
+    TNetworkAddress dummy_addr;
+    ImpalaServer::PrepareQueryContext(dummy_hostname, dummy_addr, &query_ctx);
     runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
     TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new 
TPlanFragment());
     PlanFragmentCtxPB* fragment_ctx =
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 27fdee8..72e7aac 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5430,9 +5430,9 @@ TEST_P(ExprTest, UtilityFunctions) {
   TestIsNull("fnv_hash(NULL)", TYPE_BIGINT);
 }
 
-// Test that UtilityFunctions::Coordinator() will return null if coord_address 
is unset
+// Test that UtilityFunctions::Coordinator() will return null if 
coord_hostname is unset
 TEST_P(ExprTest, CoordinatorFunction) {
-  // Make a RuntimeState where the query context does not have coord_address 
set.
+  // Make a RuntimeState where the query context does not have coord_hostname 
set.
   // Note that this should never happen in a real impalad.
   RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
   MemTracker tracker;
diff --git a/be/src/exprs/utility-functions-ir.cc 
b/be/src/exprs/utility-functions-ir.cc
index 0006ff2..bfc143d 100644
--- a/be/src/exprs/utility-functions-ir.cc
+++ b/be/src/exprs/utility-functions-ir.cc
@@ -178,8 +178,8 @@ StringVal UtilityFunctions::CurrentSession(FunctionContext* 
ctx) {
 StringVal UtilityFunctions::Coordinator(FunctionContext* ctx) {
   const TQueryCtx& query_ctx = ctx->impl()->state()->query_ctx();
   // An empty string indicates the coordinator was not set in the query 
request.
-  return query_ctx.__isset.coord_address ?
-      AnyValUtil::FromString(ctx, query_ctx.coord_address.hostname) :
+  return query_ctx.__isset.coord_hostname ?
+      AnyValUtil::FromString(ctx, query_ctx.coord_hostname) :
       StringVal::null();
 }
 
diff --git a/be/src/rpc/impala-service-pool.cc 
b/be/src/rpc/impala-service-pool.cc
index 5a8b275..29125a8 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -96,16 +96,24 @@ Status ImpalaServicePool::Init(int num_threads) {
   return Status::OK();
 }
 
+void ImpalaServicePool::Join() {
+  VLOG_QUERY << "join Impala Service pool\n";
+  std::lock_guard<std::mutex> l(close_lock_);
+  if (is_joined_) return;
+  // TODO (from KRPC): Use a proper thread pool implementation.
+  for (std::unique_ptr<Thread>& thread : threads_) {
+    thread->Join();
+  }
+  is_joined_ = true;
+}
+
 void ImpalaServicePool::Shutdown() {
   service_queue_.Shutdown();
 
   lock_guard<mutex> lock(shutdown_lock_);
   if (closing_) return;
   closing_ = true;
-  // TODO (from KRPC): Use a proper thread pool implementation.
-  for (std::unique_ptr<Thread>& thread : threads_) {
-    thread->Join();
-  }
+  Join();
 
   // Now we must drain the service queue.
   kudu::Status status = kudu::Status::ServiceUnavailable("Service is shutting 
down");
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 9721757..e61a240 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -56,6 +56,9 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// Start up the thread pool.
   Status Init(int num_threads);
 
+  /// Wait until all working threads complete execution.
+  void Join();
+
   /// Shut down the queue and the thread pool.
   void Shutdown();
 
@@ -113,6 +116,13 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   std::mutex shutdown_lock_;
   bool closing_ = false;
 
+  /// Protects is_closed_.
+  std::mutex close_lock_;
+
+  /// Set as true when all working threads complete execution.
+  /// Protected by 'close_lock_'.
+  bool is_joined_ = false;
+
   /// The address this service is running on.
   const std::string hostname_;
   const std::string port_;
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 3e112a5..689121c 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -220,6 +220,13 @@ Status RpcMgr::StartServices() {
   return Status::OK();
 }
 
+void RpcMgr::Join() {
+  if (services_started_) {
+    if (messenger_.get() == nullptr) return;
+    for (auto service_pool : service_pools_) service_pool->Join();
+  }
+}
+
 void RpcMgr::Shutdown() {
   if (messenger_.get() == nullptr) return;
   for (auto service_pool : service_pools_) service_pool->Shutdown();
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index a141dba..113d1c4 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -151,6 +151,9 @@ class RpcMgr {
   Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<P>* proxy) WARN_UNUSED_RESULT;
 
+  /// Wait until all reactor threads complete execution.
+  void Join();
+
   /// Shut down all previously registered services. All service pools are shut 
down.
   /// All acceptor and reactor threads within the messenger are also shut down.
   /// All unprocessed incoming requests will be replied with error messages.
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index ff89372..4e30916 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -44,7 +44,6 @@ DECLARE_string(ssl_minimum_version);
 
 DECLARE_int32(state_store_port);
 
-DECLARE_int32(be_port);
 DECLARE_int32(beeswax_port);
 
 static string IMPALA_HOME(getenv("IMPALA_HOME"));
@@ -510,31 +509,6 @@ TEST(ConcurrencyTest, MaxConcurrentConnections) {
   EXPECT_TRUE(did_reach_max);
 }
 
-/// Test disabled because requires a high ulimit -n on build machines. Since 
the test does
-/// not always fail, we don't lose much coverage by disabling it until we fix 
the build
-/// infra issue.
-TEST(ConcurrencyTest, DISABLED_ManyConcurrentConnections) {
-  // Test that a large number of concurrent connections will all succeed and 
not time out
-  // waiting to be accepted. (IMPALA-4135)
-  // Note that without the fix for IMPALA-4135, this test won't always fail, 
depending on
-  // the hardware that it is run on.
-  int port = GetServerPort();
-  ThriftServer* server;
-  EXPECT_OK(ThriftServerBuilder("DummyServer", MakeProcessor(), 
port).Build(&server));
-  ASSERT_OK(server->Start());
-
-  ThreadPool<int64_t> pool(
-      "group", "test", 256, 10000, [port](int tid, const int64_t& item) {
-        using Client = ThriftClient<ImpalaInternalServiceClient>;
-        Client* client = new Client("127.0.0.1", port, "", nullptr, false);
-        Status status = client->Open();
-        ASSERT_OK(status);
-      });
-  ASSERT_OK(pool.Init());
-  for (int i = 0; i < 1024 * 16; ++i) pool.Offer(i);
-  pool.DrainAndShutdown();
-}
-
 TEST(NoPasswordPemFile, BadServerCertificate) {
   int port = GetServerPort();
   ThriftServer* server;
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
deleted file mode 100644
index 656bbc3..0000000
--- a/be/src/runtime/backend-client.h
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_BACKEND_CLIENT_H
-#define IMPALA_BACKEND_CLIENT_H
-
-#include "runtime/client-cache.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/ImpalaInternalService.h"
-
-namespace impala {
-
-/// Proxy class that extends ImpalaInternalServiceClient to allow callers to 
time
-/// the wall-clock time taken in TransmitData(), so that the time spent 
sending data
-/// between backends in a query can be measured.
-class ImpalaBackendClient : public ImpalaInternalServiceClient {
- public:
-  ImpalaBackendClient(boost::shared_ptr< 
::apache::thrift::protocol::TProtocol> prot)
-    : ImpalaInternalServiceClient(prot) {
-  }
-
-  ImpalaBackendClient(boost::shared_ptr< 
::apache::thrift::protocol::TProtocol> iprot,
-      boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot)
-    : ImpalaInternalServiceClient(iprot, oprot) {
-  }
-
-};
-
-}
-
-#endif // IMPALA_BACKEND_CLIENT_H
diff --git a/be/src/runtime/client-cache-types.h 
b/be/src/runtime/client-cache-types.h
index df9a0e7..ed528a5 100644
--- a/be/src/runtime/client-cache-types.h
+++ b/be/src/runtime/client-cache-types.h
@@ -29,17 +29,9 @@ template<class T>
 class ClientConnection;
 
 /// Common cache / connection types
-class ImpalaInternalServiceClient;
-typedef ClientCache<ImpalaInternalServiceClient> 
ImpalaInternalServiceClientCache;
-typedef ClientConnection<ImpalaInternalServiceClient> 
ImpalaInternalServiceConnection;
-
 class CatalogServiceClientWrapper;
 typedef ClientCache<CatalogServiceClientWrapper> CatalogServiceClientCache;
 typedef ClientConnection<CatalogServiceClientWrapper> CatalogServiceConnection;
-
-class ImpalaBackendClient;
-typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache;
-typedef ClientConnection<ImpalaBackendClient> ImpalaBackendConnection;
 }
 
 #endif
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 1b969a1..440a5f0 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -31,7 +31,6 @@
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.inline.h"
 #include "rpc/sidecar-util.h"
-#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator-filter-state.h"
 #include "runtime/debug-options.h"
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 57fa713..06b9dd6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -38,7 +38,6 @@
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
-#include "runtime/backend-client.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "service/data-stream-service.h"
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 9c1e15b..3645504 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -30,7 +30,6 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
-#include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
@@ -228,23 +227,20 @@ struct ExecEnv::KuduClientPtr {
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
-  : ExecEnv(FLAGS_be_port, FLAGS_krpc_port,
-        FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
+  : ExecEnv(FLAGS_krpc_port, FLAGS_state_store_subscriber_port, 
FLAGS_webserver_port,
         FLAGS_state_store_host, FLAGS_state_store_port) {}
 
-ExecEnv::ExecEnv(int backend_port, int krpc_port,
-    int subscriber_port, int webserver_port, const string& statestore_host,
-    int statestore_port)
+ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
+    const string& statestore_host, int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
     // Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 
0.
     // Connections are still retried, but the retry mechanism is driven by
     // DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than 
DoRpc to
     // ensure that both RPCs and connections are retried.
-    catalogd_client_cache_(
-        new CatalogServiceClientCache(1, 0,
-            FLAGS_catalog_client_rpc_timeout_ms, 
FLAGS_catalog_client_rpc_timeout_ms, "",
-            !FLAGS_ssl_client_ca_certificate.empty())),
+    catalogd_client_cache_(new CatalogServiceClientCache(1, 0,
+        FLAGS_catalog_client_rpc_timeout_ms, 
FLAGS_catalog_client_rpc_timeout_ms, "",
+        !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new io::DiskIoMgr()),
     webserver_(new Webserver(FLAGS_webserver_interface, webserver_port, 
metrics_.get())),
@@ -256,7 +252,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     query_exec_mgr_(new QueryExecMgr()),
     rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
-    configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, 
backend_port)) {
+    configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, krpc_port)) 
{
   UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id_);
 
   // Resolve hostname to IP address.
@@ -275,9 +271,10 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   TNetworkAddress statestore_address =
       MakeNetworkAddress(statestore_host, statestore_port);
 
-  statestore_subscriber_.reset(new StatestoreSubscriber(
-      Substitute("impalad@$0", 
TNetworkAddressToString(configured_backend_address_)),
-      subscriber_address, statestore_address, metrics_.get()));
+  // Set StatestoreSubscriber::subscriber_id as hostname + be_port.
+  statestore_subscriber_.reset(
+      new StatestoreSubscriber(Substitute("impalad@$0:$1", FLAGS_hostname, 
FLAGS_be_port),
+          subscriber_address, statestore_address, metrics_.get()));
 
   if (FLAGS_is_coordinator) {
     hdfs_op_thread_pool_.reset(
@@ -650,11 +647,6 @@ void ExecEnv::InitSystemStateInfo() {
   });
 }
 
-TNetworkAddress ExecEnv::GetThriftBackendAddress() const {
-  DCHECK(impala_server_ != nullptr);
-  return impala_server_->GetThriftBackendAddress();
-}
-
 Status ExecEnv::GetKuduClient(
     const vector<string>& master_addresses, kudu::client::KuduClient** client) 
{
   string master_addr_concat = join(master_addresses, ",");
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index d0f9125..0d40ac0 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -82,9 +82,8 @@ class ExecEnv {
  public:
   ExecEnv();
 
-  ExecEnv(int backend_port, int krpc_port,
-      int subscriber_port, int webserver_port, const std::string& 
statestore_host,
-      int statestore_port);
+  ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
+      const std::string& statestore_host, int statestore_port);
 
   /// Returns the most recently created exec env instance. In a normal 
impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -110,10 +109,6 @@ class ExecEnv {
   /// once.
   void SetImpalaServer(ImpalaServer* server);
 
-  /// Get the address of the thrift backend service. Only valid to call if
-  /// StartServices() was successful.
-  TNetworkAddress GetThriftBackendAddress() const;
-
   const BackendIdPB& backend_id() const { return backend_id_; }
 
   KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
@@ -149,6 +144,10 @@ class ExecEnv {
   AdmissionController* admission_controller() { return 
admission_controller_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
 
+  const TNetworkAddress& configured_backend_address() const {
+    return configured_backend_address_;
+  }
+
   const IpAddr& ip_address() const { return ip_address_; }
 
   const TNetworkAddress& krpc_address() const { return krpc_address_; }
@@ -230,14 +229,14 @@ class ExecEnv {
   static ExecEnv* exec_env_;
   bool is_fe_tests_ = false;
 
-  /// Address of the thrift based ImpalaInternalService. In backend tests we 
allow
-  /// wildcard port 0, so this may not be the actual backend address.
+  /// The network address that the backend KRPC service is listening on:
+  /// hostname + krpc_port.
   TNetworkAddress configured_backend_address_;
 
   /// Resolved IP address of the host name.
   IpAddr ip_address_;
 
-  /// Address of the KRPC-based ImpalaInternalService
+  /// IP address of the KRPC backend service: ip_address + krpc_port.
   TNetworkAddress krpc_address_;
 
   /// fs.defaultFs value set in core-site.xml
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 6c566a6..5b865f3 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -35,7 +35,6 @@
 #include "exec/scan-node.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "kudu/rpc/rpc_context.h"
-#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-state.h"
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index ac8a73e..35f94ba 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -140,7 +140,6 @@ class FragmentInstanceState {
   const TUniqueId& instance_id() const { return 
instance_ctx_.fragment_instance_id; }
   FInstanceExecStatePB current_state() const { return current_state_.Load(); }
   bool final_report_sent() const { return final_report_sent_; }
-  const TNetworkAddress& coord_address() const { return 
query_ctx().coord_address; }
   bool IsDone() const { return current_state_.Load() == 
FInstanceExecStatePB::FINISHED; }
   ObjectPool* obj_pool();
   int64_t scan_ranges_complete() const { return scan_ranges_complete_; }
diff --git a/be/src/runtime/initial-reservations.cc 
b/be/src/runtime/initial-reservations.cc
index ed2d1f2..262415a 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -33,7 +33,7 @@
 
 using std::numeric_limits;
 
-DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_string(hostname);
 
 namespace impala {
@@ -59,7 +59,7 @@ Status InitialReservations::Init(
           query_min_reservation, &reservation_status)) {
     return Status(TErrorCode::MINIMUM_RESERVATION_UNAVAILABLE,
         PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), 
FLAGS_hostname,
-        FLAGS_be_port, PrintId(query_id), reservation_status.GetDetail());
+        FLAGS_krpc_port, PrintId(query_id), reservation_status.GetDetail());
   }
   VLOG(2) << "Successfully claimed initial reservations ("
           << PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") 
for"
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index d12f47b..8c5e978 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -73,7 +73,8 @@ Status QueryExecMgr::StartQuery(const 
ExecQueryFInstancesRequestPB* request,
     const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
   TUniqueId query_id = query_ctx.query_id;
   VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
-          << " coord=" << TNetworkAddressToString(query_ctx.coord_address);
+          << " coord=" << query_ctx.coord_hostname << ":"
+          << query_ctx.coord_ip_address.port;
   bool dummy;
   QueryState* qs =
       GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), 
&dummy);
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index a6ded7e..c9768d3 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -29,13 +29,13 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.h"
-#include "runtime/backend-client.h"
+#include "rpc/thrift-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
-#include "runtime/fragment-state.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/fragment-state.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
@@ -268,8 +268,8 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* 
exec_rpc_params,
   RETURN_IF_ERROR(InitBufferPoolState());
 
   // Initialize the RPC proxy once and report any error.
-  RETURN_IF_ERROR(ControlService::GetProxy(query_ctx().coord_krpc_address,
-      query_ctx().coord_address.hostname, &proxy_));
+  RETURN_IF_ERROR(ControlService::GetProxy(
+      query_ctx().coord_ip_address, query_ctx().coord_hostname, &proxy_));
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 8ed1e38..994715f 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,7 +27,6 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
-#include "runtime/backend-client.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
@@ -271,18 +270,17 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
       min_max_filter->ToProtobuf(params.mutable_min_max_filter());
     }
-    const TNetworkAddress& krpc_address = 
query_state_->query_ctx().coord_krpc_address;
-    const TNetworkAddress& host_address = 
query_state_->query_ctx().coord_address;
+    const TNetworkAddress& krpc_address = 
query_state_->query_ctx().coord_ip_address;
+    const std::string& hostname = query_state_->query_ctx().coord_hostname;
 
     // Use 'proxy' to send the filter to the coordinator.
     unique_ptr<DataStreamServiceProxy> proxy;
-    Status get_proxy_status =
-        DataStreamService::GetProxy(krpc_address, host_address.hostname, 
&proxy);
+    Status get_proxy_status = DataStreamService::GetProxy(krpc_address, 
hostname, &proxy);
     if (!get_proxy_status.ok()) {
       // Failing to send a filter is not a query-wide error - the remote 
fragment will
       // continue regardless.
-      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
-          host_address.hostname, get_proxy_status.msg().msg());
+      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1", 
hostname,
+          get_proxy_status.msg().msg());
       return;
     }
     // Increment 'num_inflight_rpcs_' to make sure that the filter will not be 
deallocated
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 9b75002..a315144 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -146,8 +146,8 @@ Status TestEnv::CreateQueryState(
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
   query_ctx.request_pool = "test-pool";
-  query_ctx.coord_address = exec_env_->configured_backend_address_;
-  query_ctx.coord_krpc_address = exec_env_->krpc_address_;
+  query_ctx.coord_hostname = exec_env_->configured_backend_address_.hostname;
+  query_ctx.coord_ip_address = exec_env_->krpc_address_;
   query_ctx.coord_backend_id.hi = 0;
   query_ctx.coord_backend_id.lo = 0;
   TQueryOptions* query_options_to_use = 
&query_ctx.client_request.query_options;
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index 9595b72..2b6cab0 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -25,7 +25,6 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "common/status.h"
-#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/scheduler.h"
 #include "util/metrics.h"
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 8f030ad..7b7dfef 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -38,7 +38,6 @@ add_library(Service
   impala-beeswax-server.cc
   impala-hs2-server.cc
   impala-http-handler.cc
-  impala-internal-service.cc
   impalad-main.cc
   impala-server.cc
   query-options.cc
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 931f865..aa8fa72 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -32,7 +32,6 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "rpc/rpc-mgr.inline.h"
-#include "runtime/backend-client.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
@@ -158,7 +157,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx& 
query_ctx, Frontend* fro
   summary_profile_->AddInfoStringRedacted(
       "Sql Statement", query_ctx_.client_request.stmt);
   summary_profile_->AddInfoString("Coordinator",
-      
TNetworkAddressToString(ExecEnv::GetInstance()->GetThriftBackendAddress()));
+      
TNetworkAddressToString(ExecEnv::GetInstance()->configured_backend_address()));
 
   summary_profile_->AddChild(frontend_profile_);
 
diff --git a/be/src/service/control-service.cc 
b/be/src/service/control-service.cc
index 34c3800..3db1055 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -141,7 +141,8 @@ void ControlService::ExecQueryFInstances(const 
ExecQueryFInstancesRequestPB* req
   ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_ctx.query_id);
   VLOG_QUERY << "ExecQueryFInstances():"
              << " query_id=" << PrintId(query_ctx.query_id)
-             << " coord=" << TNetworkAddressToString(query_ctx.coord_address)
+             << " coord=" << query_ctx.coord_hostname << ":"
+             << query_ctx.coord_ip_address.port
              << " #instances=" << fragment_info.fragment_instance_ctxs.size();
   Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
       request, query_ctx, fragment_info);
diff --git a/be/src/service/impala-internal-service.cc 
b/be/src/service/impala-internal-service.cc
deleted file mode 100644
index 1db4bba..0000000
--- a/be/src/service/impala-internal-service.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/impala-internal-service.h"
-
-#include <boost/lexical_cast.hpp>
-
-#include "common/status.h"
-#include "gutil/strings/substitute.h"
-#include "service/impala-server.h"
-#include "runtime/query-state.h"
-#include "runtime/fragment-instance-state.h"
-#include "runtime/exec-env.h"
-#include "util/debug-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DECLARE_string(debug_actions);
-
-ImpalaInternalService::ImpalaInternalService() {
-  impala_server_ = ExecEnv::GetInstance()->impala_server();
-  DCHECK(impala_server_ != nullptr);
-}
-
-template <typename T> void SetUnknownIdError(
-    const string& id_type, const TUniqueId& id, T* status_container) {
-  Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
-      Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
-  status.SetTStatus(status_container);
-}
diff --git a/be/src/service/impala-internal-service.h 
b/be/src/service/impala-internal-service.h
deleted file mode 100644
index 425678b..0000000
--- a/be/src/service/impala-internal-service.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
-#define IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
-
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-namespace impala {
-
-class ImpalaServer;
-
-/// Proxies Thrift RPC requests onto their implementing objects for the
-/// ImpalaInternalService service.
-class ImpalaInternalService : public ImpalaInternalServiceIf {
- public:
-  ImpalaInternalService();
-
- private:
-  ImpalaServer* impala_server_;
-};
-
-}
-
-#endif
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3c75851..ad0c3fc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,6 +57,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
+#include "rpc/rpc-mgr.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-thread.h"
 #include "rpc/thrift-util.h"
@@ -64,16 +65,15 @@
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
+#include "runtime/query-driver.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
-#include "runtime/query-driver.h"
 #include "scheduling/admission-controller.h"
 #include "service/cancellation-work.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
 #include "service/impala-http-handler.h"
-#include "service/impala-internal-service.h"
 #include "util/auth-util.h"
 #include "util/bit-util.h"
 #include "util/coding-util.h"
@@ -566,15 +566,6 @@ bool ImpalaServer::IsExecutor() { return is_executor_; }
 
 bool ImpalaServer::IsHealthy() { return services_started_.load(); }
 
-int ImpalaServer::GetThriftBackendPort() {
-  DCHECK(thrift_be_server_ != nullptr);
-  return thrift_be_server_->port();
-}
-
-TNetworkAddress ImpalaServer::GetThriftBackendAddress() {
-  return MakeNetworkAddress(FLAGS_hostname, GetThriftBackendPort());
-}
-
 int ImpalaServer::GetBeeswaxPort() {
   DCHECK(beeswax_server_ != nullptr);
   return beeswax_server_->port();
@@ -1145,12 +1136,12 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& 
query_ctx,
 }
 
 void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
-  PrepareQueryContext(GetThriftBackendAddress(),
-      ExecEnv::GetInstance()->krpc_address(), query_ctx);
+  PrepareQueryContext(exec_env_->configured_backend_address().hostname,
+      exec_env_->krpc_address(), query_ctx);
 }
 
-void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
-    const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx) {
+void ImpalaServer::PrepareQueryContext(
+    const std::string& hostname, const TNetworkAddress& krpc_addr, TQueryCtx* 
query_ctx) {
   query_ctx->__set_pid(getpid());
   int64_t now_us = UnixMicros();
   const Timezone& utc_tz = TimezoneDatabase::GetUtcTimezone();
@@ -1177,8 +1168,8 @@ void ImpalaServer::PrepareQueryContext(const 
TNetworkAddress& backend_addr,
     
query_ctx->__set_now_string(query_ctx->client_request.query_options.now_string);
   }
   query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
-  query_ctx->__set_coord_address(backend_addr);
-  query_ctx->__set_coord_krpc_address(krpc_addr);
+  query_ctx->__set_coord_hostname(hostname);
+  query_ctx->__set_coord_ip_address(krpc_addr);
   TUniqueId backend_id;
   UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id);
   query_ctx->__set_coord_backend_id(backend_id);
@@ -2131,7 +2122,8 @@ void 
ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d
   bool is_quiescing = shutting_down_.Load() != 0;
 
   *be_desc->mutable_backend_id() = exec_env_->backend_id();
-  *be_desc->mutable_address() = 
FromTNetworkAddress(exec_env_->GetThriftBackendAddress());
+  *be_desc->mutable_address() =
+      FromTNetworkAddress(exec_env_->configured_backend_address());
   be_desc->set_ip_address(exec_env_->ip_address());
   be_desc->set_is_coordinator(FLAGS_is_coordinator);
   be_desc->set_is_executor(FLAGS_is_executor);
@@ -2661,8 +2653,8 @@ void ImpalaServer::ExpireQuery(ClientRequestState* crs, 
const Status& status) {
   crs->set_expired();
 }
 
-Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, 
int32_t hs2_port,
-    int32_t hs2_http_port) {
+Status ImpalaServer::Start(
+    int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port) {
   exec_env_->SetImpalaServer(this);
 
   // We must register the HTTP handlers after registering the ImpalaServer 
with the
@@ -2695,32 +2687,9 @@ Status ImpalaServer::Start(int32_t thrift_be_port, 
int32_t beeswax_port, int32_t
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, 
&ssl_version));
   }
 
-  // Start the internal service.
-  if (thrift_be_port > 0 || (TestInfo::is_test() && thrift_be_port == 0)) {
-    boost::shared_ptr<ImpalaInternalService> thrift_if(new 
ImpalaInternalService());
-    boost::shared_ptr<TProcessor> be_processor(
-        new ImpalaInternalServiceProcessor(thrift_if));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("backend", exec_env_->metrics()));
-    be_processor->setEventHandler(event_handler);
-
-    ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port);
-
-    if (IsInternalTlsConfigured()) {
-      LOG(INFO) << "Enabling SSL for backend";
-      be_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
-    }
-    ThriftServer* server;
-    RETURN_IF_ERROR(be_builder.metrics(exec_env_->metrics()).Build(&server));
-    thrift_be_server_.reset(server);
-  }
-
   if (!FLAGS_is_coordinator) {
     LOG(INFO) << "Initialized executor Impala server on "
-              << TNetworkAddressToString(GetThriftBackendAddress());
+              << 
TNetworkAddressToString(exec_env_->configured_backend_address());
   } else {
     // Initialize the client servers.
     boost::shared_ptr<ImpalaServer> handler = shared_from_this();
@@ -2813,14 +2782,10 @@ Status ImpalaServer::Start(int32_t thrift_be_port, 
int32_t beeswax_port, int32_t
     }
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
-      << TNetworkAddressToString(GetThriftBackendAddress());
+            << 
TNetworkAddressToString(exec_env_->configured_backend_address());
 
   // Start the RPC services.
   RETURN_IF_ERROR(exec_env_->StartKrpcService());
-  if (thrift_be_server_.get()) {
-    RETURN_IF_ERROR(thrift_be_server_->Start());
-    LOG(INFO) << "Impala InternalService listening on " << 
thrift_be_server_->port();
-  }
   if (hs2_server_.get()) {
     RETURN_IF_ERROR(hs2_server_->Start());
     LOG(INFO) << "Impala HiveServer2 Service listening on " << 
hs2_server_->port();
@@ -2845,8 +2810,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, 
int32_t beeswax_port, int32_t
 void ImpalaServer::Join() {
   // The server shuts down by exiting the process, so just block here until 
the process
   // exits.
-  thrift_be_server_->Join();
-  thrift_be_server_.reset();
+  exec_env_->rpc_mgr()->Join();
 
   if (FLAGS_is_coordinator) {
     beeswax_server_->Join();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 52e0a85..03b0612 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -31,7 +31,6 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/control_service.pb.h"
 #include "kudu/util/random.h"
@@ -209,9 +208,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// the port the server run on. A port value of 0 means to choose an 
arbitrary
   /// ephemeral port in tests and to not start the service in a daemon. A port 
< 0
   /// always means to not start the service. The port values can be obtained 
after
-  /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or 
GetHS2Port().
-  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
-      int32_t hs2_http_port);
+  /// Start() by calling GetBeeswaxPort() or GetHS2Port().
+  Status Start(int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port);
 
   /// Blocks until the server shuts down.
   void Join();
@@ -361,7 +359,7 @@ class ImpalaServer : public ImpalaServiceIf,
   void PrepareQueryContext(TQueryCtx* query_ctx);
 
   /// Static helper for PrepareQueryContext() that is used from expr-benchmark.
-  static void PrepareQueryContext(const TNetworkAddress& backend_addr,
+  static void PrepareQueryContext(const std::string& hostname,
       const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx);
 
   /// SessionHandlerIf methods
@@ -442,14 +440,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Returns whether this backend is healthy, i.e. able to accept queries.
   bool IsHealthy();
 
-  /// Returns the port that the thrift backend server is listening on. Valid 
to call after
-  /// the server has started successfully.
-  int GetThriftBackendPort();
-
-  /// Returns the network address that the thrift backend server is listening 
on. Valid
-  /// to call after the server has started successfully.
-  TNetworkAddress GetThriftBackendAddress();
-
   /// Returns the port that the Beeswax server is listening on. Valid to call 
after
   /// the server has started successfully.
   int GetBeeswaxPort();
@@ -1579,7 +1569,6 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::scoped_ptr<ThriftServer> beeswax_server_;
   boost::scoped_ptr<ThriftServer> hs2_server_;
   boost::scoped_ptr<ThriftServer> hs2_http_server_;
-  boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
   /// Flag that records if backend and/or client services have been started. 
The flag is
   /// set after all services required for the server have been started.
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 22db7fe..9704067 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -31,7 +31,6 @@
 #include "exec/hbase-table-writer.h"
 #include "exprs/hive-udf-call.h"
 #include "exprs/timezone_db.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaService.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-server.h"
@@ -54,7 +53,6 @@ using namespace impala;
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
 DECLARE_int32(hs2_http_port);
-DECLARE_int32(be_port);
 DECLARE_bool(is_coordinator);
 
 int ImpaladMain(int argc, char** argv) {
@@ -85,8 +83,8 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  Status status = impala_server->Start(
-      FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port);
+  Status status =
+      impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port, 
FLAGS_hs2_http_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
         << status.GetDetail();
diff --git a/be/src/service/session-expiry-test.cc 
b/be/src/service/session-expiry-test.cc
index 7dd68ad..7877a86 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -37,7 +37,6 @@ using namespace impala;
 
 DECLARE_bool(abort_on_config_error);
 DECLARE_int32(idle_session_timeout);
-DECLARE_int32(be_port);
 DECLARE_int32(beeswax_port);
 
 // TODO: When sleep(..) queries can be cancelled, write a test that confirms 
long-running
diff --git a/be/src/testutil/in-process-servers.cc 
b/be/src/testutil/in-process-servers.cc
index a9bdfeb..337932d 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -37,7 +37,6 @@
 
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
-DECLARE_int32(be_port);
 DECLARE_int32(krpc_port);
 
 using namespace apache::thrift;
@@ -45,31 +44,27 @@ using namespace impala;
 
 Status InProcessImpalaServer::StartWithEphemeralPorts(const string& 
statestore_host,
     int statestore_port, InProcessImpalaServer** server) {
-  // These flags are read directly in several places to find the address of 
the local
-  // backend interface.
-  FLAGS_be_port = 0;
-  // Thrift server ctor allows port to be set to 0. Not supported with KRPC.
-  // So KRPC port must be explicitly set here.
+  // This flag is read directly in several places to find the address of the 
backend
+  // interface, so we must set it here.
   FLAGS_krpc_port = FindUnusedEphemeralPort();
 
-  // Use wildcard addresses of 0 so that the Thrift servers will pick their 
own port.
-  *server = new InProcessImpalaServer(FLAGS_hostname, 0, FLAGS_krpc_port, 0, 0,
-      statestore_host, statestore_port);
+  *server = new InProcessImpalaServer(
+      FLAGS_hostname, FLAGS_krpc_port, 0, 0, statestore_host, statestore_port);
   // Start the daemon and check if it works, if not delete the current server 
object and
   // pick a new set of ports
   return (*server)->StartWithClientServers(0, 0, 0);
 }
 
-InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int 
backend_port,
-    int krpc_port, int subscriber_port, int webserver_port, const string& 
statestore_host,
+InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int 
krpc_port,
+    int subscriber_port, int webserver_port, const string& statestore_host,
     int statestore_port)
-  : backend_port_(backend_port),
+  : krpc_port_(krpc_port),
     beeswax_port_(0),
     hs2_port_(0),
     hs2_http_port_(0),
     impala_server_(NULL),
-    exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port, 
webserver_port,
-        statestore_host, statestore_port)) {}
+    exec_env_(new ExecEnv(
+        krpc_port, subscriber_port, webserver_port, statestore_host, 
statestore_port)) {}
 
 void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
@@ -85,15 +80,10 @@ Status InProcessImpalaServer::StartWithClientServers(
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
   SetCatalogIsReady();
-  RETURN_IF_ERROR(
-      impala_server_->Start(backend_port_, beeswax_port, hs2_port, 
hs2_http_port_));
-
-  // This flag is read directly in several places to find the address of the 
local
-  // backend interface.
-  FLAGS_be_port = impala_server_->GetThriftBackendPort();
+  RETURN_IF_ERROR(impala_server_->Start(beeswax_port, hs2_port, 
hs2_http_port));
 
   // Wait for up to 1s for the backend server to start
-  RETURN_IF_ERROR(WaitForServer(FLAGS_hostname, FLAGS_be_port, 10, 100));
+  RETURN_IF_ERROR(WaitForServer(FLAGS_hostname, krpc_port_, 10, 100));
   return Status::OK();
 }
 
diff --git a/be/src/testutil/in-process-servers.h 
b/be/src/testutil/in-process-servers.h
index 3902520..71f480c 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -42,9 +42,8 @@ class InProcessImpalaServer {
  public:
   /// Initialises the server, but does not start any network-attached
   /// services or run any threads.
-  InProcessImpalaServer(const std::string& hostname, int backend_port, int 
krpc_port,
-                        int subscriber_port, int webserver_port,
-                        const std::string& statestore_host, int 
statestore_port);
+  InProcessImpalaServer(const std::string& hostname, int krpc_port, int 
subscriber_port,
+      int webserver_port, const std::string& statestore_host, int 
statestore_port);
 
   /// Starts an in-process Impala server with ephemeral ports that are 
independent of the
   /// ports used by a concurrently running normal Impala daemon. The hostname 
is set to
@@ -80,7 +79,7 @@ class InProcessImpalaServer {
   int GetHS2Port() const;
 
  private:
-  uint32_t backend_port_;
+  uint32_t krpc_port_;
 
   uint32_t beeswax_port_;
 
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index f31f895..d2d7eb2 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -64,7 +64,7 @@ using boost::tokenizer;
 using namespace beeswax;
 using namespace parquet;
 
-DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_string(hostname);
 
 namespace impala {
@@ -314,7 +314,7 @@ string GetStackTrace() {
 }
 
 string GetBackendString() {
-  return Substitute("$0:$1", FLAGS_hostname, FLAGS_be_port);
+  return Substitute("$0:$1", FLAGS_hostname, FLAGS_krpc_port);
 }
 
 DebugActionTokens TokenizeDebugActions(const string& debug_actions) {
diff --git a/bin/generate_minidump_collection_testdata.py 
b/bin/generate_minidump_collection_testdata.py
index 021941b..2cee8b9 100755
--- a/bin/generate_minidump_collection_testdata.py
+++ b/bin/generate_minidump_collection_testdata.py
@@ -49,7 +49,6 @@ options, args = parser.parse_args()
 
 CONFIG_FILE = '''-beeswax_port=21000
 -fe_port=21000
--be_port=22000
 -hs2_port=21050
 -enable_webserver=true
 -mem_limit=108232130560
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index fe0e16d..27716ed 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -533,12 +533,12 @@ struct TQueryCtx {
   // Process ID of the impalad to which the user is connected.
   5: required i32 pid
 
-  // The initiating coordinator's address of its thrift based 
ImpalaInternalService.
+  // The coordinator's hostname.
   // TODO: determine whether we can get this somehow via the Thrift rpc 
mechanism.
-  6: optional Types.TNetworkAddress coord_address
+  6: optional string coord_hostname
 
   // The initiating coordinator's address of its KRPC based 
ImpalaInternalService.
-  7: optional Types.TNetworkAddress coord_krpc_address
+  7: optional Types.TNetworkAddress coord_ip_address
 
   // List of tables missing relevant table and/or column stats. Used for
   // populating query-profile fields consumed by CM as well as warning 
messages.
@@ -785,7 +785,3 @@ struct TParseDateStringResult {
   // parsed date string was not in a canonical form.
   3: optional string canonical_date_string
 }
-
-service ImpalaInternalService {
-
-}
diff --git a/tests/custom_cluster/test_blacklist.py 
b/tests/custom_cluster/test_blacklist.py
index 78ec93e..8d24161 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -64,7 +64,8 @@ class TestBlacklist(CustomClusterTestSuite):
     backends_json = 
self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match.group(1) == "%s:%s" % \
-        (killed_impalad.hostname, killed_impalad.service.be_port), 
result.runtime_profile
+        (killed_impalad.hostname, killed_impalad.service.krpc_port), \
+        result.runtime_profile
     assert backends_json["num_blacklisted_backends"] == 1, backends_json
     assert backends_json["num_active_backends"] == 2, backends_json
     assert len(backends_json["backends"]) == 3, backends_json
@@ -111,7 +112,8 @@ class TestBlacklist(CustomClusterTestSuite):
     result = self.execute_query("select count(*) from tpch.lineitem")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match.group(1) == "%s:%s" % \
-        (killed_impalad.hostname, killed_impalad.service.be_port), 
result.runtime_profile
+        (killed_impalad.hostname, killed_impalad.service.krpc_port), \
+        result.runtime_profile
 
     # Restart the impalad.
     killed_impalad.start()
@@ -165,5 +167,5 @@ class TestBlacklist(CustomClusterTestSuite):
     result = self.execute_query("select count(*) from tpch.lineitem")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match is not None and match.group(1) == "%s:%s" % \
-      (killed_impalad.hostname, killed_impalad.service.be_port), \
+      (killed_impalad.hostname, killed_impalad.service.krpc_port), \
       result.runtime_profile
diff --git a/tests/custom_cluster/test_process_failures.py 
b/tests/custom_cluster/test_process_failures.py
index 4e106ef..0bf5b2a 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -155,7 +155,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     # Assert that the query status on the query profile web page contains the 
expected
     # failed hostport.
     failed_hostport = "%s:%s" % (worker_impalad.service.hostname,
-                                 worker_impalad.service.be_port)
+                                 worker_impalad.service.krpc_port)
     query_profile_page = impalad.service.read_query_profile_page(query_id)
     assert failed_hostport in query_profile_page,\
         "Query status did not contain expected hostport %s\n\n%s" % 
(failed_hostport,
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 95ee8df..45caa25 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -944,7 +944,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Validate that the given profile indicates that the given impalad was 
blacklisted
     during query execution."""
     assert "Blacklisted Executors: 
{0}:{1}".format(blacklisted_impalad.hostname,
-        blacklisted_impalad.service.be_port) in profile, profile
+        blacklisted_impalad.service.krpc_port) in profile, profile
 
   def __assert_executors_not_blacklisted(self, impalad, profile):
     """Validate that the given profile indicates that the given impalad was not
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 22bbbf9..d08c293 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -277,12 +277,11 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
     assert ("This may be because the port specified is wrong.") not in str(ex)
 
     # Test that pointing to the wrong thrift service (the HS2 port) fails 
gracefully-ish.
-    thrift_ports = [21051, 22001]  # HS2 port, old backend port.
-    for port in thrift_ports:
-      ex = self.execute_query_expect_failure(self.client,
-          ":shutdown('localhost:{0}')".format(port))
-      assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex)
-      assert ("This may be because the port specified is wrong.") in str(ex)
+    thrift_port = 21051  # HS2 port.
+    ex = self.execute_query_expect_failure(self.client,
+        ":shutdown('localhost:{0}')".format(thrift_port))
+    assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex)
+    assert ("This may be because the port specified is wrong.") in str(ex)
 
     # Test RPC error handling with debug action.
     ex = self.execute_query_expect_failure(self.client, 
":shutdown('localhost:27001')",
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 7cc693c..4026e67 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -661,8 +661,8 @@ class TestWebPage(ImpalaTestSuite):
 
     # The 'address' column is the backend port of the impalad.
     assert len(backend_row['address']) > 0
-    be_ports = ('22000', '22001', '22002')
-    assert backend_row['address'].endswith(be_ports)
+    krpc_ports = ('27000', '27001', '27002')
+    assert backend_row['address'].endswith(krpc_ports)
 
     # The 'krpc_address' is the krpc address of the impalad.
     assert len(backend_row['krpc_address']) > 0

Reply via email to