Repository: impala Updated Branches: refs/heads/master bdd904922 -> 649f175df
cleanup: remove RuntimeState::exec_env_ Change-Id: I4a1b1bdd41e3d10982b3a4bdb0217e716b4df67f Reviewed-on: http://gerrit.cloudera.org:8080/11269 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fccaa723 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fccaa723 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fccaa723 Branch: refs/heads/master Commit: fccaa72366e972228178687adcb0ee1d8e8c5930 Parents: bdd9049 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Sun Aug 19 00:08:16 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Wed Aug 22 06:07:51 2018 +0000 ---------------------------------------------------------------------- be/src/exec/grouping-aggregator.cc | 2 +- be/src/exec/hbase-scan-node.cc | 4 +++- be/src/exec/hbase-table-writer.cc | 3 ++- be/src/exec/hdfs-orc-scanner.h | 3 ++- be/src/exec/hdfs-scan-node-base.cc | 10 +++++---- be/src/exec/kudu-scan-node-base.cc | 2 +- be/src/exec/kudu-table-sink.cc | 4 ++-- be/src/exec/partitioned-hash-join-builder.cc | 4 ++-- be/src/runtime/buffered-tuple-stream.cc | 2 +- be/src/runtime/fragment-instance-state.cc | 2 +- be/src/runtime/reservation-manager.cc | 2 +- be/src/runtime/runtime-filter-bank.cc | 4 ++-- be/src/runtime/runtime-state.cc | 26 ++--------------------- be/src/runtime/runtime-state.h | 8 ------- 14 files changed, 26 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/grouping-aggregator.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc index 42d6be8..0ac4926 100644 --- a/be/src/exec/grouping-aggregator.cc +++ b/be/src/exec/grouping-aggregator.cc @@ -210,7 +210,7 @@ Status GroupingAggregator::Open(RuntimeState* state) { if (ht_allocator_ == nullptr) { // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call. - ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(), + ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(), buffer_pool_client(), resource_profile_.spillable_buffer_size)); if (!is_streaming_preagg_ && needs_serialize_) { http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc index 5aee999..148be66 100644 --- a/be/src/exec/hbase-scan-node.cc +++ b/be/src/exec/hbase-scan-node.cc @@ -19,6 +19,7 @@ #include <algorithm> +#include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" @@ -60,7 +61,8 @@ Status HBaseScanNode::Prepare(RuntimeState* state) { hbase_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER); AddBytesReadCounters(); - hbase_scanner_.reset(new HBaseTableScanner(this, state->htable_factory(), state)); + hbase_scanner_.reset( + new HBaseTableScanner(this, ExecEnv::GetInstance()->htable_factory(), state)); tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); if (tuple_desc_ == NULL) { http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc index 2723eb7..cf71522 100644 --- a/be/src/exec/hbase-table-writer.cc +++ b/be/src/exec/hbase-table-writer.cc @@ -23,6 +23,7 @@ #include "common/logging.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" +#include "runtime/exec-env.h" #include "runtime/hbase-table-factory.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -53,7 +54,7 @@ HBaseTableWriter::HBaseTableWriter(HBaseTableDescriptor* table_desc, runtime_profile_(profile) { } Status HBaseTableWriter::Init(RuntimeState* state) { - RETURN_IF_ERROR(state->htable_factory()->GetTable(table_desc_->name(), + RETURN_IF_ERROR(ExecEnv::GetInstance()->htable_factory()->GetTable(table_desc_->name(), &table_)); encoding_timer_ = ADD_TIMER(runtime_profile_, "EncodingTimer"); htable_put_timer_ = ADD_TIMER(runtime_profile_, "HTablePutTimer"); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-orc-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h index 132442e..5b9be5b 100644 --- a/be/src/exec/hdfs-orc-scanner.h +++ b/be/src/exec/hdfs-orc-scanner.h @@ -21,6 +21,7 @@ #include <orc/OrcFile.hh> +#include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/runtime-state.h" #include "exec/hdfs-scanner.h" @@ -81,7 +82,7 @@ class HdfsOrcScanner : public HdfsScanner { } uint64_t getNaturalReadSize() const { - return scanner_->state_->io_mgr()->max_buffer_size(); + return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size(); } void read(void* buf, uint64_t length, uint64_t offset); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 6454652..09c2c0d 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -340,7 +340,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { } RETURN_IF_ERROR(ClaimBufferReservation(state)); - reader_context_ = runtime_state_->io_mgr()->RegisterContext(); + reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext(); // Initialize HdfsScanNode specific counters hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER); @@ -392,7 +392,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { "MaxCompressedTextFileLength", TUnit::BYTES); hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters( - &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1); + &active_hdfs_read_thread_counter_, + ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1); counters_running_ = true; @@ -413,7 +414,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) { if (reader_context_ != nullptr) { // Need to wait for all the active scanner threads to finish to ensure there is no // more memory tracked by this scan node's mem tracker. - state->io_mgr()->UnregisterContext(reader_context_.get()); + ExecEnv::GetInstance()->disk_io_mgr()->UnregisterContext(reader_context_.get()); } StopAndFinalizeCounters(); @@ -569,7 +570,8 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, DCHECK_GE(len, 0); DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length) << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")"; - disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local); + disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue( + file, disk_id, expected_local); ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange); range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index 0e7cdfa..b57f4ee 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -92,7 +92,7 @@ Status KuduScanNodeBase::Open(RuntimeState* state) { const KuduTableDescriptor* table_desc = static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc()); - RETURN_IF_ERROR(runtime_state_->exec_env()->GetKuduClient( + RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient( table_desc->kudu_master_addresses(), &client_)); uint64_t latest_ts = static_cast<uint64_t>( http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 32ba2bb..b50394d 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -120,8 +120,8 @@ Status KuduTableSink::Open(RuntimeState* state) { } client_tracked_bytes_ = required_mem; - RETURN_IF_ERROR( - state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), &client_)); + RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient( + table_desc_->kudu_master_addresses(), &client_)); KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_), "Unable to open Kudu table"); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index c627b98..c2980b5 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -149,8 +149,8 @@ Status PhjBuilder::Open(RuntimeState* state) { } if (ht_allocator_ == nullptr) { // Create 'ht_allocator_' on the first call to Open(). - ht_allocator_.reset(new Suballocator( - state->exec_env()->buffer_pool(), buffer_pool_client_, spillable_buffer_size_)); + ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(), + buffer_pool_client_, spillable_buffer_size_)); } RETURN_IF_ERROR(CreateHashPartitions(0)); AllocateRuntimeFilters(); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index 71175d1..e0cf854 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -54,7 +54,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state, int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots) : state_(state), desc_(row_desc), - buffer_pool_(state->exec_env()->buffer_pool()), + buffer_pool_(ExecEnv::GetInstance()->buffer_pool()), buffer_pool_client_(buffer_pool_client), read_page_reservation_(buffer_pool_client_), write_page_reservation_(buffer_pool_client_), http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 51ff13d..700a391 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -123,7 +123,7 @@ void FragmentInstanceState::Cancel() { DCHECK(runtime_state_ != nullptr); runtime_state_->set_is_cancelled(); if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_); - runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id()); + ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id()); } Status FragmentInstanceState::Prepare() { http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/reservation-manager.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/reservation-manager.cc b/be/src/runtime/reservation-manager.cc index 1712e09..c87f92c 100644 --- a/be/src/runtime/reservation-manager.cc +++ b/be/src/runtime/reservation-manager.cc @@ -44,7 +44,7 @@ void ReservationManager::Close(RuntimeState* state) { VLOG_FILE << name_ << " returning reservation " << resource_profile_.min_reservation; state->query_state()->initial_reservations()->Return( &buffer_pool_client_, resource_profile_.min_reservation); - state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_); + ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_); } } http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 64638a6..e1a2512 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -61,7 +61,7 @@ Status RuntimeFilterBank::ClaimBufferReservation() { DCHECK(!buffer_pool_client_.is_registered()); string filter_bank_name = Substitute( "RuntimeFilterBank (Fragment Id: $0)", PrintId(state_->fragment_instance_id())); - RETURN_IF_ERROR(state_->exec_env()->buffer_pool()->RegisterClient(filter_bank_name, + RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(filter_bank_name, state_->query_state()->file_group(), state_->instance_buffer_reservation(), filter_mem_tracker_.get(), total_bloom_filter_mem_required_, state_->runtime_profile(), &buffer_pool_client_)); @@ -268,7 +268,7 @@ void RuntimeFilterBank::Close() { << ") returning reservation " << total_bloom_filter_mem_required_; state_->query_state()->initial_reservations()->Return( &buffer_pool_client_, total_bloom_filter_mem_required_); - state_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_); + ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_); } DCHECK_EQ(filter_mem_tracker_->consumption(), 0); filter_mem_tracker_->Close(); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 5bb5a2d..dedb5f5 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -73,7 +73,6 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag utc_timestamp_(new TimestampValue(TimestampValue::Parse( query_state->query_ctx().utc_timestamp_string))), local_time_zone_(&TimezoneDatabase::GetUtcTimezone()), - exec_env_(exec_env), profile_(RuntimeProfile::Create( obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))), instance_buffer_reservation_(new ReservationTracker) { @@ -91,7 +90,6 @@ RuntimeState::RuntimeState( now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))), utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))), local_time_zone_(&TimezoneDatabase::GetUtcTimezone()), - exec_env_(exec_env), profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) { // We may use execution resources while evaluating exprs, etc. Decremented in // ReleaseResources() to release resources. @@ -111,7 +109,7 @@ void RuntimeState::Init() { SCOPED_TIMER(profile_->total_time_counter()); // Register with the thread mgr - resource_pool_ = exec_env_->thread_mgr()->CreatePool(); + resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool(); DCHECK(resource_pool_ != nullptr); if (fragment_ctx_ != nullptr) { // Ensure that the planner correctly determined the required threads. @@ -265,7 +263,7 @@ void RuntimeState::ReleaseResources() { DCHECK(!released_resources_); if (filter_bank_ != nullptr) filter_bank_->Close(); if (resource_pool_ != nullptr) { - exec_env_->thread_mgr()->DestroyPool(move(resource_pool_)); + ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_)); } // Release any memory associated with codegen. if (codegen_ != nullptr) codegen_->Close(); @@ -290,26 +288,6 @@ const std::string& RuntimeState::GetEffectiveUser() const { return impala::GetEffectiveUser(query_ctx().session); } -ImpalaBackendClientCache* RuntimeState::impalad_client_cache() { - return exec_env_->impalad_client_cache(); -} - -CatalogServiceClientCache* RuntimeState::catalogd_client_cache() { - return exec_env_->catalogd_client_cache(); -} - -io::DiskIoMgr* RuntimeState::io_mgr() { - return exec_env_->disk_io_mgr(); -} - -KrpcDataStreamMgr* RuntimeState::stream_mgr() { - return exec_env_->stream_mgr(); -} - -HBaseTableFactory* RuntimeState::htable_factory() { - return exec_env_->htable_factory(); -} - ObjectPool* RuntimeState::obj_pool() const { DCHECK(query_state_ != nullptr); return query_state_->obj_pool(); http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 78a9864..6ca9574 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -106,12 +106,6 @@ class RuntimeState { ? instance_ctx_->fragment_instance_id : no_instance_id_; } - ExecEnv* exec_env() { return exec_env_; } - KrpcDataStreamMgr* stream_mgr(); - HBaseTableFactory* htable_factory(); - ImpalaBackendClientCache* impalad_client_cache(); - CatalogServiceClientCache* catalogd_client_cache(); - io::DiskIoMgr* io_mgr(); MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker ReservationTracker* instance_buffer_reservation() { @@ -327,8 +321,6 @@ class RuntimeState { /// Owned by a static storage member of TimezoneDatabase class. It cannot be nullptr. const Timezone* local_time_zone_; - /// TODO: get rid of this and use ExecEnv::GetInstance() instead - ExecEnv* exec_env_; boost::scoped_ptr<LlvmCodeGen> codegen_; /// Contains all ScalarFnCall expressions which need to be codegen'd.