Repository: impala
Updated Branches:
  refs/heads/master bdd904922 -> 649f175df


cleanup: remove RuntimeState::exec_env_

Change-Id: I4a1b1bdd41e3d10982b3a4bdb0217e716b4df67f
Reviewed-on: http://gerrit.cloudera.org:8080/11269
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fccaa723
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fccaa723
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fccaa723

Branch: refs/heads/master
Commit: fccaa72366e972228178687adcb0ee1d8e8c5930
Parents: bdd9049
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Sun Aug 19 00:08:16 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Wed Aug 22 06:07:51 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |  2 +-
 be/src/exec/hbase-scan-node.cc               |  4 +++-
 be/src/exec/hbase-table-writer.cc            |  3 ++-
 be/src/exec/hdfs-orc-scanner.h               |  3 ++-
 be/src/exec/hdfs-scan-node-base.cc           | 10 +++++----
 be/src/exec/kudu-scan-node-base.cc           |  2 +-
 be/src/exec/kudu-table-sink.cc               |  4 ++--
 be/src/exec/partitioned-hash-join-builder.cc |  4 ++--
 be/src/runtime/buffered-tuple-stream.cc      |  2 +-
 be/src/runtime/fragment-instance-state.cc    |  2 +-
 be/src/runtime/reservation-manager.cc        |  2 +-
 be/src/runtime/runtime-filter-bank.cc        |  4 ++--
 be/src/runtime/runtime-state.cc              | 26 ++---------------------
 be/src/runtime/runtime-state.h               |  8 -------
 14 files changed, 26 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc 
b/be/src/exec/grouping-aggregator.cc
index 42d6be8..0ac4926 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -210,7 +210,7 @@ Status GroupingAggregator::Open(RuntimeState* state) {
 
   if (ht_allocator_ == nullptr) {
     // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() 
call.
-    ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
+    ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(),
         buffer_pool_client(), resource_profile_.spillable_buffer_size));
 
     if (!is_streaming_preagg_ && needs_serialize_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 5aee999..148be66 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
@@ -60,7 +61,8 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
   hbase_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER);
   AddBytesReadCounters();
 
-  hbase_scanner_.reset(new HBaseTableScanner(this, state->htable_factory(), 
state));
+  hbase_scanner_.reset(
+      new HBaseTableScanner(this, ExecEnv::GetInstance()->htable_factory(), 
state));
 
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
   if (tuple_desc_ == NULL) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.cc 
b/be/src/exec/hbase-table-writer.cc
index 2723eb7..cf71522 100644
--- a/be/src/exec/hbase-table-writer.cc
+++ b/be/src/exec/hbase-table-writer.cc
@@ -23,6 +23,7 @@
 #include "common/logging.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/exec-env.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -53,7 +54,7 @@ HBaseTableWriter::HBaseTableWriter(HBaseTableDescriptor* 
table_desc,
     runtime_profile_(profile) { }
 
 Status HBaseTableWriter::Init(RuntimeState* state) {
-  RETURN_IF_ERROR(state->htable_factory()->GetTable(table_desc_->name(),
+  
RETURN_IF_ERROR(ExecEnv::GetInstance()->htable_factory()->GetTable(table_desc_->name(),
       &table_));
   encoding_timer_ = ADD_TIMER(runtime_profile_, "EncodingTimer");
   htable_put_timer_ = ADD_TIMER(runtime_profile_, "HTablePutTimer");

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 132442e..5b9be5b 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -21,6 +21,7 @@
 
 #include <orc/OrcFile.hh>
 
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "exec/hdfs-scanner.h"
@@ -81,7 +82,7 @@ class HdfsOrcScanner : public HdfsScanner {
     }
 
     uint64_t getNaturalReadSize() const {
-      return scanner_->state_->io_mgr()->max_buffer_size();
+      return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size();
     }
 
     void read(void* buf, uint64_t length, uint64_t offset);

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 6454652..09c2c0d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -340,7 +340,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   }
 
   RETURN_IF_ERROR(ClaimBufferReservation(state));
-  reader_context_ = runtime_state_->io_mgr()->RegisterContext();
+  reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
   hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
@@ -392,7 +392,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
       "MaxCompressedTextFileLength", TUnit::BYTES);
 
   hdfs_read_thread_concurrency_bucket_ = 
runtime_profile()->AddBucketingCounters(
-      &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 
1);
+      &active_hdfs_read_thread_counter_,
+      ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);
 
   counters_running_ = true;
 
@@ -413,7 +414,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
   if (reader_context_ != nullptr) {
     // Need to wait for all the active scanner threads to finish to ensure 
there is no
     // more memory tracked by this scan node's mem tracker.
-    state->io_mgr()->UnregisterContext(reader_context_.get());
+    
ExecEnv::GetInstance()->disk_io_mgr()->UnregisterContext(reader_context_.get());
   }
 
   StopAndFinalizeCounters();
@@ -569,7 +570,8 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, 
const char* file,
   DCHECK_GE(len, 0);
   DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, 
file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len 
<< ")";
-  disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, 
expected_local);
+  disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
+      file, disk_id, expected_local);
 
   ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
   range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, 
metadata);

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc 
b/be/src/exec/kudu-scan-node-base.cc
index 0e7cdfa..b57f4ee 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -92,7 +92,7 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   const KuduTableDescriptor* table_desc =
       static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
 
-  RETURN_IF_ERROR(runtime_state_->exec_env()->GetKuduClient(
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
       table_desc->kudu_master_addresses(), &client_));
 
   uint64_t latest_ts = static_cast<uint64_t>(

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 32ba2bb..b50394d 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -120,8 +120,8 @@ Status KuduTableSink::Open(RuntimeState* state) {
   }
   client_tracked_bytes_ = required_mem;
 
-  RETURN_IF_ERROR(
-      state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), 
&client_));
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
+      table_desc_->kudu_master_addresses(), &client_));
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index c627b98..c2980b5 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -149,8 +149,8 @@ Status PhjBuilder::Open(RuntimeState* state) {
   }
   if (ht_allocator_ == nullptr) {
     // Create 'ht_allocator_' on the first call to Open().
-    ht_allocator_.reset(new Suballocator(
-        state->exec_env()->buffer_pool(), buffer_pool_client_, 
spillable_buffer_size_));
+    ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(),
+        buffer_pool_client_, spillable_buffer_size_));
   }
   RETURN_IF_ERROR(CreateHashPartitions(0));
   AllocateRuntimeFilters();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 71175d1..e0cf854 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -54,7 +54,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& 
ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
-    buffer_pool_(state->exec_env()->buffer_pool()),
+    buffer_pool_(ExecEnv::GetInstance()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
     read_page_reservation_(buffer_pool_client_),
     write_page_reservation_(buffer_pool_client_),

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 51ff13d..700a391 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -123,7 +123,7 @@ void FragmentInstanceState::Cancel() {
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
   if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
-  runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
+  
ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
 Status FragmentInstanceState::Prepare() {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/reservation-manager.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/reservation-manager.cc 
b/be/src/runtime/reservation-manager.cc
index 1712e09..c87f92c 100644
--- a/be/src/runtime/reservation-manager.cc
+++ b/be/src/runtime/reservation-manager.cc
@@ -44,7 +44,7 @@ void ReservationManager::Close(RuntimeState* state) {
     VLOG_FILE << name_ << " returning reservation " << 
resource_profile_.min_reservation;
     state->query_state()->initial_reservations()->Return(
         &buffer_pool_client_, resource_profile_.min_reservation);
-    state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+    
ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 64638a6..e1a2512 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -61,7 +61,7 @@ Status RuntimeFilterBank::ClaimBufferReservation() {
   DCHECK(!buffer_pool_client_.is_registered());
   string filter_bank_name = Substitute(
       "RuntimeFilterBank (Fragment Id: $0)", 
PrintId(state_->fragment_instance_id()));
-  
RETURN_IF_ERROR(state_->exec_env()->buffer_pool()->RegisterClient(filter_bank_name,
+  
RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(filter_bank_name,
       state_->query_state()->file_group(), 
state_->instance_buffer_reservation(),
       filter_mem_tracker_.get(), total_bloom_filter_mem_required_,
       state_->runtime_profile(), &buffer_pool_client_));
@@ -268,7 +268,7 @@ void RuntimeFilterBank::Close() {
               << ") returning reservation " << 
total_bloom_filter_mem_required_;
     state_->query_state()->initial_reservations()->Return(
         &buffer_pool_client_, total_bloom_filter_mem_required_);
-    state_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+    
ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
   DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
   filter_mem_tracker_->Close();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5bb5a2d..dedb5f5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -73,7 +73,6 @@ RuntimeState::RuntimeState(QueryState* query_state, const 
TPlanFragmentCtx& frag
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(
         query_state->query_ctx().utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    exec_env_(exec_env),
     profile_(RuntimeProfile::Create(
           obj_pool(), "Fragment " + 
PrintId(instance_ctx.fragment_instance_id))),
     instance_buffer_reservation_(new ReservationTracker) {
@@ -91,7 +90,6 @@ RuntimeState::RuntimeState(
     now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
     utc_timestamp_(new 
TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    exec_env_(exec_env),
     profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
   // We may use execution resources while evaluating exprs, etc. Decremented in
   // ReleaseResources() to release resources.
@@ -111,7 +109,7 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
-  resource_pool_ = exec_env_->thread_mgr()->CreatePool();
+  resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
   DCHECK(resource_pool_ != nullptr);
   if (fragment_ctx_ != nullptr) {
     // Ensure that the planner correctly determined the required threads.
@@ -265,7 +263,7 @@ void RuntimeState::ReleaseResources() {
   DCHECK(!released_resources_);
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
-    exec_env_->thread_mgr()->DestroyPool(move(resource_pool_));
+    ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_));
   }
   // Release any memory associated with codegen.
   if (codegen_ != nullptr) codegen_->Close();
@@ -290,26 +288,6 @@ const std::string& RuntimeState::GetEffectiveUser() const {
   return impala::GetEffectiveUser(query_ctx().session);
 }
 
-ImpalaBackendClientCache* RuntimeState::impalad_client_cache() {
-  return exec_env_->impalad_client_cache();
-}
-
-CatalogServiceClientCache* RuntimeState::catalogd_client_cache() {
-  return exec_env_->catalogd_client_cache();
-}
-
-io::DiskIoMgr* RuntimeState::io_mgr() {
-  return exec_env_->disk_io_mgr();
-}
-
-KrpcDataStreamMgr* RuntimeState::stream_mgr() {
-  return exec_env_->stream_mgr();
-}
-
-HBaseTableFactory* RuntimeState::htable_factory() {
-  return exec_env_->htable_factory();
-}
-
 ObjectPool* RuntimeState::obj_pool() const {
   DCHECK(query_state_ != nullptr);
   return query_state_->obj_pool();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 78a9864..6ca9574 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -106,12 +106,6 @@ class RuntimeState {
         ? instance_ctx_->fragment_instance_id
         : no_instance_id_;
   }
-  ExecEnv* exec_env() { return exec_env_; }
-  KrpcDataStreamMgr* stream_mgr();
-  HBaseTableFactory* htable_factory();
-  ImpalaBackendClientCache* impalad_client_cache();
-  CatalogServiceClientCache* catalogd_client_cache();
-  io::DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker();  // reference to the query_state_'s 
memtracker
   ReservationTracker* instance_buffer_reservation() {
@@ -327,8 +321,6 @@ class RuntimeState {
   /// Owned by a static storage member of TimezoneDatabase class. It cannot be 
nullptr.
   const Timezone* local_time_zone_;
 
-  /// TODO: get rid of this and use ExecEnv::GetInstance() instead
-  ExecEnv* exec_env_;
   boost::scoped_ptr<LlvmCodeGen> codegen_;
 
   /// Contains all ScalarFnCall expressions which need to be codegen'd.

Reply via email to