http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc index dec6aa6..287f53a 100644 --- a/be/src/runtime/io/request-context.cc +++ b/be/src/runtime/io/request-context.cc @@ -17,122 +17,74 @@ #include "runtime/io/disk-io-mgr-internal.h" -#include "runtime/exec-env.h" - #include "common/names.h" using namespace impala; using namespace impala::io; -BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, - RequestContext* reader, ScanRange* scan_range, uint8_t* buffer, - int64_t buffer_len) - : io_mgr_(io_mgr), - reader_(reader), - scan_range_(scan_range), - buffer_(buffer), - buffer_len_(buffer_len) { - DCHECK(io_mgr != nullptr); - DCHECK(scan_range != nullptr); - DCHECK(buffer != nullptr); - DCHECK_GE(buffer_len, 0); -} +void RequestContext::Cancel(const Status& status) { + DCHECK(!status.ok()); -BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, - ScanRange* scan_range, BufferPool::ClientHandle* bp_client, - BufferPool::BufferHandle handle) : - io_mgr_(io_mgr), - reader_(reader), - scan_range_(scan_range), - buffer_(handle.data()), - buffer_len_(handle.len()), - bp_client_(bp_client), - handle_(move(handle)) { - DCHECK(io_mgr != nullptr); - DCHECK(scan_range != nullptr); - DCHECK(bp_client_->is_registered()); - DCHECK(handle_.is_open()); -} - -void RequestContext::FreeBuffer(BufferDescriptor* buffer) { - DCHECK(buffer->buffer_ != nullptr); - if (!buffer->is_cached() && !buffer->is_client_buffer()) { - // Only buffers that were allocated by DiskIoMgr need to be freed. - ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( - buffer->bp_client_, &buffer->handle_); - } - buffer->buffer_ = nullptr; -} - -// Cancellation of a RequestContext requires coordination from multiple threads that may -// hold references to the context: -// 1. Disk threads that are currently processing a range for this context. -// 2. Caller threads that are waiting in GetNext(). -// -// Each thread that currently has a reference to the request context must notice the -// cancel, cancel any pending operations involving the context and remove the contxt from -// tracking structures. Once no more operations are pending on the context and no more -// I/O mgr threads hold references to the context, the context can be marked inactive -// (see CancelAndMarkInactive()), after which the owner of the context object can free -// it. -// -// The steps are: -// 1. Cancel() will immediately set the context in the Cancelled state. This prevents any -// other thread from adding more ready buffers to the context (they all take a lock and -// check the state before doing so), or any write ranges to the context. -// 2. Cancel() will call Cancel() on each ScanRange that is not yet complete, unblocking -// any threads in GetNext(). If there was no prior error for a scan range, any reads from -// that scan range will return a CANCELLED Status. Cancel() also invokes callbacks for -// all WriteRanges with a CANCELLED Status. -// 3. Disk threads notice the context is cancelled either when picking the next context -// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled -// state, removes the context from the disk queue. The last thread per disk then calls -// DecrementDiskRefCount(). After the last disk thread has called DecrementDiskRefCount(), -// cancellation is done and it is safe to unregister the context. -void RequestContext::Cancel() { // Callbacks are collected in this vector and invoked while no lock is held. vector<WriteRange::WriteDoneCallback> write_callbacks; { - unique_lock<mutex> lock(lock_); + lock_guard<mutex> lock(lock_); DCHECK(Validate()) << endl << DebugString(); // Already being cancelled if (state_ == RequestContext::Cancelled) return; + DCHECK(status_.ok()); + status_ = status; + // The reader will be put into a cancelled state until call cleanup is complete. state_ = RequestContext::Cancelled; - // Clear out all request ranges from queues for this reader. Cancel the scan - // ranges and invoke the write range callbacks to propagate the cancellation. - for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED); - active_scan_ranges_.clear(); - for (PerDiskState& disk_state : disk_states_) { - RequestRange* range; - while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) { - if (range->request_type() == RequestType::WRITE) { + // Cancel all scan ranges for this reader. Each range could be one one of + // four queues. + for (int i = 0; i < disk_states_.size(); ++i) { + RequestContext::PerDiskState& state = disk_states_[i]; + RequestRange* range = NULL; + while ((range = state.in_flight_ranges()->Dequeue()) != NULL) { + if (range->request_type() == RequestType::READ) { + static_cast<ScanRange*>(range)->Cancel(status); + } else { + DCHECK(range->request_type() == RequestType::WRITE); write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_); } } - while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr); + + ScanRange* scan_range; + while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) { + scan_range->Cancel(status); + } WriteRange* write_range; - while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) { + while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) { write_callbacks.push_back(write_range->callback_); } } - // Clear out the lists of scan ranges. - while (ready_to_start_ranges_.Dequeue() != nullptr); - while (cached_ranges_.Dequeue() != nullptr); - // Ensure that the reader is scheduled on all disks (it may already be scheduled on - // some). The disk threads will notice that the context is cancelled and do any - // required cleanup for the disk state. + ScanRange* range = NULL; + while ((range = ready_to_start_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + while ((range = blocked_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + while ((range = cached_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + + // Schedule reader on all disks. The disks will notice it is cancelled and do any + // required cleanup for (int i = 0; i < disk_states_.size(); ++i) { - disk_states_[i].ScheduleContext(lock, this, i); + RequestContext::PerDiskState& state = disk_states_[i]; + state.ScheduleContext(this, i); } } for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) { - write_callback(Status::CANCELLED); + write_callback(status_); } // Signal reader and unblock the GetNext/Read thread. That read will fail with @@ -141,7 +93,7 @@ void RequestContext::Cancel() { } void RequestContext::CancelAndMarkInactive() { - Cancel(); + Cancel(Status::CANCELLED); boost::unique_lock<boost::mutex> l(lock_); DCHECK_NE(state_, Inactive); @@ -150,76 +102,54 @@ void RequestContext::CancelAndMarkInactive() { // Wait until the ranges finish up. while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l); - // Validate that no ranges are active. - DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString(); - - // Validate that no threads are active and the context is not queued. - for (const PerDiskState& disk_state : disk_states_) { - DCHECK_EQ(0, disk_state.in_flight_ranges()->size()) << endl << DebugString(); - DCHECK_EQ(0, disk_state.unstarted_scan_ranges()->size()) << endl << DebugString(); - DCHECK_EQ(0, disk_state.num_threads_in_op()) << endl << DebugString(); - DCHECK(!disk_state.is_on_queue()) << endl << DebugString(); - } + // Validate that no buffers were leaked from this context. + DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString(); + DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString(); DCHECK(Validate()) << endl << DebugString(); state_ = Inactive; } -void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock, - RequestRange* range, ScheduleMode schedule_mode) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - DCHECK_EQ(state_, Active) << DebugString(); - PerDiskState* disk_state = &disk_states_[range->disk_id()]; - if (disk_state->done()) { - DCHECK_EQ(disk_state->num_remaining_ranges(), 0); - disk_state->set_done(false); +void RequestContext::AddRequestRange( + RequestRange* range, bool schedule_immediately) { + // DCHECK(lock_.is_locked()); // TODO: boost should have this API + RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; + if (state.done()) { + DCHECK_EQ(state.num_remaining_ranges(), 0); + state.set_done(false); ++num_disks_with_ranges_; } + + bool schedule_context; if (range->request_type() == RequestType::READ) { ScanRange* scan_range = static_cast<ScanRange*>(range); - if (schedule_mode == ScheduleMode::IMMEDIATELY) { - ScheduleScanRange(lock, scan_range); - } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) { - disk_state->unstarted_scan_ranges()->Enqueue(scan_range); + if (schedule_immediately) { + ScheduleScanRange(scan_range); + } else { + state.unstarted_scan_ranges()->Enqueue(scan_range); num_unstarted_scan_ranges_.Add(1); - // If there's no 'next_scan_range_to_start', schedule this RequestContext so that - // one of the 'unstarted_scan_ranges' will become the 'next_scan_range_to_start'. - if (disk_state->next_scan_range_to_start() == nullptr) { - disk_state->ScheduleContext(lock, this, range->disk_id()); - } } + // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will + // be set. If it's not NULL, this context will be scheduled when GetNextRange() is + // invoked. + schedule_context = state.next_scan_range_to_start() == NULL; } else { DCHECK(range->request_type() == RequestType::WRITE); - DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode); + DCHECK(!schedule_immediately); WriteRange* write_range = static_cast<WriteRange*>(range); - disk_state->unstarted_write_ranges()->Enqueue(write_range); + state.unstarted_write_ranges()->Enqueue(write_range); - // Ensure that the context is scheduled so that the write range gets picked up. - // ScheduleContext() has no effect if already scheduled, so this is safe to do always. - disk_state->ScheduleContext(lock, this, range->disk_id()); + // ScheduleContext() has no effect if the context is already scheduled, + // so this is safe. + schedule_context = true; } - ++disk_state->num_remaining_ranges(); -} - -void RequestContext::AddActiveScanRangeLocked( - const unique_lock<mutex>& lock, ScanRange* range) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - DCHECK(state_ == Active); - active_scan_ranges_.insert(range); -} -void RequestContext::RemoveActiveScanRange(ScanRange* range) { - unique_lock<mutex> lock(lock_); - RemoveActiveScanRangeLocked(lock, range); + if (schedule_context) state.ScheduleContext(this, range->disk_id()); + ++state.num_remaining_ranges(); } -void RequestContext::RemoveActiveScanRangeLocked( - const unique_lock<mutex>& lock, ScanRange* range) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - active_scan_ranges_.erase(range); -} - -RequestContext::RequestContext(DiskIoMgr* parent, int num_disks) - : parent_(parent), disk_states_(num_disks) {} +RequestContext::RequestContext( + DiskIoMgr* parent, int num_disks, MemTracker* tracker) + : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {} // Dumps out request context information. Lock should be taken by caller string RequestContext::DebugString() const { @@ -229,9 +159,13 @@ string RequestContext::DebugString() const { if (state_ == RequestContext::Cancelled) ss << "Cancelled"; if (state_ == RequestContext::Active) ss << "Active"; if (state_ != RequestContext::Inactive) { - ss << " #disk_with_ranges=" << num_disks_with_ranges_ - << " #disks=" << num_disks_with_ranges_ - << " #active scan ranges=" << active_scan_ranges_.size(); + ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail()) + << " #ready_buffers=" << num_ready_buffers_.Load() + << " #used_buffers=" << num_used_buffers_.Load() + << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load() + << " #finished_scan_ranges=" << num_finished_ranges_.Load() + << " #disk_with_ranges=" << num_disks_with_ranges_ + << " #disks=" << num_disks_with_ranges_; for (int i = 0; i < disk_states_.size(); ++i) { ss << endl << " " << i << ": " << "is_on_queue=" << disk_states_[i].is_on_queue() @@ -254,6 +188,16 @@ bool RequestContext::Validate() const { return false; } + if (num_used_buffers_.Load() < 0) { + LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load(); + return false; + } + + if (num_ready_buffers_.Load() < 0) { + LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load(); + return false; + } + int total_unstarted_ranges = 0; for (int i = 0; i < disk_states_.size(); ++i) { const PerDiskState& state = disk_states_[i]; @@ -331,8 +275,8 @@ bool RequestContext::Validate() const { LOG(WARNING) << "Reader cancelled but has ready to start ranges."; return false; } - if (!active_scan_ranges_.empty()) { - LOG(WARNING) << "Reader cancelled but has active ranges."; + if (!blocked_ranges_.empty()) { + LOG(WARNING) << "Reader cancelled but has blocked ranges."; return false; } } @@ -340,11 +284,10 @@ bool RequestContext::Validate() const { return true; } -void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& context_lock, +void RequestContext::PerDiskState::ScheduleContext( RequestContext* context, int disk_id) { - DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock()); - if (is_on_queue_.Load() == 0 && !done_) { - is_on_queue_.Store(1); + if (!is_on_queue_ && !done_) { + is_on_queue_ = true; context->parent_->disk_queues_[disk_id]->EnqueueContext(context); } }
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index b028596..fd68669 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -23,13 +23,10 @@ namespace impala { namespace io { - -// Mode argument for AddRangeToDisk(). -enum class ScheduleMode { - IMMEDIATELY, UPON_GETNEXT, BY_CALLER -}; /// A request context is used to group together I/O requests belonging to a client of the -/// I/O manager for management and scheduling. +/// I/O manager for management and scheduling. For most I/O manager clients it is an +/// opaque pointer, but some clients may need to include this header, e.g. to make the +/// unique_ptr<DiskIoRequestContext> destructor work correctly. /// /// Implementation Details /// ====================== @@ -37,109 +34,56 @@ enum class ScheduleMode { /// maintains state across all disks as well as per disk state. /// The unit for an IO request is a RequestRange, which may be a ScanRange or a /// WriteRange. -/// A scan range for the reader is on one of six states: -/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued +/// A scan range for the reader is on one of five states: +/// 1) PerDiskState's unstarted_ranges: This range has only been queued /// and nothing has been read from it. -/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be started. -/// As soon as the reader picks it up, it will move to the 'in_flight_ranges_' +/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started. +/// As soon as the reader picks it up, it will move to the in_flight_ranges /// queue. -/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and will +/// 3) PerDiskState's in_flight_ranges: This range is being processed and will /// be read from the next time a disk thread picks it up in GetNextRequestRange() -/// 4) The ScanRange is blocked waiting for buffers because it does not have any unused -/// buffers to read data into. It is unblocked when a client adds new buffers via -/// AllocateBuffersForRange() or returns existing buffers via ReturnBuffer(). -/// ScanRanges in this state are identified by 'blocked_on_buffer_' == true. -/// 5) ScanRange is cached and in the 'cached_ranges_' queue. -/// 6) Inactive - either all the data for the range was returned or the range was -/// cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ != OK. -/// -/// If the scan range is read and does not get blocked waiting for buffers, the +/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range +/// anymore. We need the caller to pull a buffer off which will put this in +/// the in_flight_ranges queue. These ranges are in the RequestContext's +/// blocked_ranges_ queue. +/// 5) ScanRange is cached and in the cached_ranges_ queue. +// +/// If the scan range is read and does not get blocked on the outgoing queue, the /// transitions are: 1 -> 2 -> 3. /// If the scan range does get blocked, the transitions are /// 1 -> 2 -> 3 -> (4 -> 3)* -/// -/// In the case of a cached scan range, the range is immediately put in 'cached_ranges_'. +// +/// In the case of a cached scan range, the range is immediately put in cached_ranges_. /// When the caller asks for the next range to process, we first pull ranges from -/// the 'cache_ranges_' queue. If the range was cached, the range is removed and +/// the cache_ranges_ queue. If the range was cached, the range is removed and /// done (ranges are either entirely cached or not at all). If the cached read attempt /// fails, we put the range in state 1. -/// -/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that they can be -/// cancelled when the RequestContext is cancelled. Scan ranges are removed from -/// 'active_scan_ranges_' during their transition to state 6. -/// -/// A write range for a context may be in one of two queues: -/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not processed. -/// 2) 'in_flight_ranges_': The write range is ready to be processed by the next disk -/// thread that picks it up in GetNextRequestRange(). -/// +// +/// A write range for a context may be in one of two lists: +/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed. +/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread +/// that picks it up in GetNextRequestRange(). +// /// AddWriteRange() adds WriteRanges for a disk. /// It is the responsibility of the client to pin the data to be written via a WriteRange /// in memory. After a WriteRange has been written, a callback is invoked to inform the /// client that the write has completed. -/// +// /// An important assumption is that write does not exceed the maximum read size and that /// the entire range is written when the write request is handled. (In other words, writes /// are not broken up.) -/// +// /// When a RequestContext is processed by a disk thread in GetNextRequestRange(), /// a write range is always removed from the list of unstarted write ranges and appended /// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read -/// that is scheduled (by calling GetNextUnstartedRange()) is always followed by a write -/// (if one exists). And since at most one WriteRange can be present in in_flight_ranges_ -/// at any time (once a write range is returned from GetNetxRequestRange() it is completed -/// and not re-enqueued), a scan range scheduled via a call to GetNextUnstartedRange() can -/// be queued up behind at most one write range. +/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one +/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any +/// time (once a write range is returned from GetNetxRequestRange() it is completed an +/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up +/// behind at most one write range. class RequestContext { public: - ~RequestContext() { - DCHECK_EQ(state_, Inactive) << "Must be unregistered. " << DebugString(); - } - - /// Cancel the context asynchronously. All outstanding requests are cancelled - /// asynchronously. This does not need to be called if the context finishes normally. - /// Calling GetNext() on any scan ranges belonging to this RequestContext will return - /// CANCELLED (or another error, if an error was encountered for that scan range before - /// it is cancelled). - void Cancel(); - - bool IsCancelled() { - boost::unique_lock<boost::mutex> lock(lock_); - return state_ == Cancelled; - } - - int64_t bytes_read_local() const { return bytes_read_local_.Load(); } - int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); } - int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); } - int num_remote_ranges() const { return num_remote_ranges_.Load(); } - int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); } - - int cached_file_handles_hit_count() const { - return cached_file_handles_hit_count_.Load(); - } - - int cached_file_handles_miss_count() const { - return cached_file_handles_miss_count_.Load(); - } - - void set_bytes_read_counter(RuntimeProfile::Counter* bytes_read_counter) { - bytes_read_counter_ = bytes_read_counter; - } - - void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; } - - void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) { - open_file_timer_ = open_file_timer; - } - - void set_active_read_thread_counter( - RuntimeProfile::Counter* active_read_thread_counter) { - active_read_thread_counter_ = active_read_thread_counter; - } - - void set_disks_accessed_bitmap(RuntimeProfile::Counter* disks_accessed_bitmap) { - disks_accessed_bitmap_ = disks_accessed_bitmap; - } + ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; } private: DISALLOW_COPY_AND_ASSIGN(RequestContext); @@ -162,19 +106,13 @@ class RequestContext { Inactive, }; - RequestContext(DiskIoMgr* parent, int num_disks); - - /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(), - /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the - /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL. - /// Does not acquire 'lock_'. - void FreeBuffer(BufferDescriptor* buffer); + RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker); /// Decrements the number of active disks for this reader. If the disk count /// goes to 0, the disk complete condition variable is signaled. - /// 'lock_' must be held via 'lock'. - void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + /// Reader lock must be taken before this call. + void DecrementDiskRefCount() { + // boost doesn't let us dcheck that the reader lock is taken DCHECK_GT(num_disks_with_ranges_, 0); if (--num_disks_with_ranges_ == 0) { disks_complete_cond_var_.NotifyAll(); @@ -191,48 +129,25 @@ class RequestContext { /// Adds range to in_flight_ranges, scheduling this reader on the disk threads /// if necessary. - /// 'lock_' must be held via 'lock'. Only valid to call if this context is active. - void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + /// Reader lock must be taken before this. + void ScheduleScanRange(ScanRange* range) { DCHECK_EQ(state_, Active); - DCHECK(range != nullptr); + DCHECK(range != NULL); RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; state.in_flight_ranges()->Enqueue(range); - state.ScheduleContext(lock, this, range->disk_id()); + state.ScheduleContext(this, range->disk_id()); } + /// Cancels the context with status code 'status' + void Cancel(const Status& status); + /// Cancel the context if not already cancelled, wait for all scan ranges to finish /// and mark the context as inactive, after which it cannot be used. void CancelAndMarkInactive(); - /// Adds a request range to the appropriate disk state. 'schedule_mode' controls which - /// queue the range is placed in. This RequestContext is scheduled on the disk state - /// if required by 'schedule_mode'. - /// - /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added to the - /// 'unstarted_write_ranges_' queue, from which they will be asynchronously moved to the - /// 'in_flight_ranges_' queue. - /// - /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, the range is - /// immediately added to the 'in_flight_ranges_' queue where it will be processed - /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to the - /// 'unstarted_ranges_' queue, from which it can be returned to a client by - /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not added to - /// any queues. The range will be scheduled later as a separate step, e.g. when it is - /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'. - void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, RequestRange* range, - ScheduleMode schedule_mode); - - /// Adds an active range to 'active_scan_ranges_' - void AddActiveScanRangeLocked( - const boost::unique_lock<boost::mutex>& lock, ScanRange* range); - - /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after eos or - /// cancellation. If calling the Locked version, the caller must hold - /// 'lock_'. Otherwise the function will acquire 'lock_'. - void RemoveActiveScanRange(ScanRange* range); - void RemoveActiveScanRangeLocked( - const boost::unique_lock<boost::mutex>& lock, ScanRange* range); + /// Adds request range to disk queue for this request context. Currently, + /// schedule_immediately must be false is RequestRange is a write range. + void AddRequestRange(RequestRange* range, bool schedule_immediately); /// Validates invariants of reader. Reader lock must be taken beforehand. bool Validate() const; @@ -243,6 +158,9 @@ class RequestContext { /// Parent object DiskIoMgr* const parent_; + /// Memory used for this reader. This is unowned by this object. + MemTracker* const mem_tracker_; + /// Total bytes read for this reader RuntimeProfile::Counter* bytes_read_counter_ = nullptr; @@ -272,6 +190,13 @@ class RequestContext { /// Total number of bytes from remote reads that were expected to be local. AtomicInt64 unexpected_remote_bytes_{0}; + /// The number of buffers that have been returned to the reader (via GetNext) that the + /// reader has not returned. Only included for debugging and diagnostics. + AtomicInt32 num_buffers_in_reader_{0}; + + /// The number of scan ranges that have been completed for this reader. + AtomicInt32 num_finished_ranges_{0}; + /// The number of scan ranges that required a remote read, updated at the end of each /// range scan. Only used for diagnostics. AtomicInt32 num_remote_ranges_{0}; @@ -286,6 +211,17 @@ class RequestContext { /// Total number of file handle opens where the file handle was not in the cache AtomicInt32 cached_file_handles_miss_count_{0}; + /// The number of buffers that are being used for this reader. This is the sum + /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about + /// to be queued). This includes both IOMgr-allocated buffers and client-provided + /// buffers. + AtomicInt32 num_used_buffers_{0}; + + /// The total number of ready buffers across all ranges. Ready buffers are buffers + /// that have been read from disk but not retrieved by the caller. + /// This is the sum of all queued buffers in all ranges for this reader context. + AtomicInt32 num_ready_buffers_{0}; + /// All fields below are accessed by multiple threads and the lock needs to be /// taken before accessing them. Must be acquired before ScanRange::lock_ if both /// are held simultaneously. @@ -294,16 +230,8 @@ class RequestContext { /// Current state of the reader State state_ = Active; - /// Scan ranges that have been added to the IO mgr for this context. Ranges can only - /// be added when 'state_' is Active. When this context is cancelled, Cancel() is - /// called for all the active ranges. If a client attempts to add a range while - /// 'state_' is Cancelled, the range is not added to this list and Status::CANCELLED - /// is returned to the client. This ensures that all active ranges are cancelled as a - /// result of RequestContext cancellation. - /// Ranges can be cancelled or hit eos non-atomically with their removal from this set, - /// so eos or cancelled ranges may be temporarily present here. Cancelling these ranges - /// a second time or cancelling after eos is safe and has no effect. - boost::unordered_set<ScanRange*> active_scan_ranges_; + /// Status of this reader. Set to non-ok if cancelled. + Status status_; /// The number of disks with scan ranges remaining (always equal to the sum of /// disks with ranges). @@ -312,18 +240,21 @@ class RequestContext { /// This is the list of ranges that are expected to be cached on the DN. /// When the reader asks for a new range (GetNextScanRange()), we first /// return ranges from this list. - InternalList<ScanRange> cached_ranges_; + InternalQueue<ScanRange> cached_ranges_; /// A list of ranges that should be returned in subsequent calls to - /// GetNextUnstartedRange(). + /// GetNextRange. /// There is a trade-off with when to populate this list. Populating it on - /// demand means consumers need to wait (happens in DiskIoMgr::GetNextUnstartedRange()). + /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()). /// Populating it preemptively means we make worse scheduling decisions. /// We currently populate one range per disk. /// TODO: think about this some more. - InternalList<ScanRange> ready_to_start_ranges_; + InternalQueue<ScanRange> ready_to_start_ranges_; ConditionVariable ready_to_start_ranges_cv_; // used with lock_ + /// Ranges that are blocked due to back pressure on outgoing buffers. + InternalQueue<ScanRange> blocked_ranges_; + /// Condition variable for UnregisterContext() to wait for all disks to complete ConditionVariable disks_complete_cond_var_; @@ -342,9 +273,21 @@ class RequestContext { next_scan_range_to_start_ = range; } - bool is_on_queue() const { return is_on_queue_.Load() != 0; } + /// We need to have a memory barrier to prevent this load from being reordered + /// with num_threads_in_op(), since these variables are set without the reader + /// lock taken + bool is_on_queue() const { + bool b = is_on_queue_; + __sync_synchronize(); + return b; + } - int num_threads_in_op() const { return num_threads_in_op_.Load(); } + int num_threads_in_op() const { + int v = num_threads_in_op_.Load(); + // TODO: determine whether this barrier is necessary for any callsites. + AtomicUtil::MemoryBarrier(); + return v; + } const InternalQueue<ScanRange>* unstarted_scan_ranges() const { return &unstarted_scan_ranges_; @@ -363,41 +306,26 @@ class RequestContext { InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; } /// Schedules the request context on this disk if it's not already on the queue. - /// context->lock_ must be held by the caller via 'context_lock'. - void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock, - RequestContext* context, int disk_id); - - /// Increment the count of disk threads that have a reference to this context. These - /// threads do not hold any locks while reading from HDFS, so we need to prevent the - /// RequestContext from being destroyed underneath them. - /// - /// The caller does not need to hold 'lock_', so this can execute concurrently with - /// itself and DecrementDiskThread(). - void IncrementDiskThreadAndDequeue() { - /// Incrementing 'num_threads_in_op_' first so that there is no window when other - /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no - /// references left to this context. + /// Context lock must be taken before this. + void ScheduleContext(RequestContext* context, int disk_id); + + /// Increment the ref count on reader. We need to track the number of threads per + /// reader per disk that are in the unlocked hdfs read code section. This is updated + /// by multiple threads without a lock so we need to use an atomic int. + void IncrementRequestThreadAndDequeue() { num_threads_in_op_.Add(1); - is_on_queue_.Store(0); + is_on_queue_ = false; } - /// Decrement the count of disks threads with a reference to this context. Does final - /// cleanup if the context is cancelled and this is the last thread for the disk. - /// context->lock_ must be held by the caller via 'context_lock'. - void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock, - RequestContext* context) { - DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock()); - num_threads_in_op_.Add(-1); - - if (context->state_ != Cancelled) { - DCHECK_EQ(context->state_, Active); - return; - } - // The state is cancelled, check to see if we're the last thread to touch the - // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_' - // in this order to avoid a race with IncrementDiskThreadAndDequeue(). - if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) { - context->DecrementDiskRefCount(context_lock); + void DecrementRequestThread() { num_threads_in_op_.Add(-1); } + + /// Decrement request thread count and do final cleanup if this is the last + /// thread. RequestContext lock must be taken before this. + void DecrementRequestThreadAndCheckDone(RequestContext* context) { + num_threads_in_op_.Add(-1); // Also acts as a barrier. + if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) { + // This thread is the last one for this reader on this disk, do final cleanup + context->DecrementDiskRefCount(); done_ = true; } } @@ -410,12 +338,7 @@ class RequestContext { bool done_ = true; /// For each disk, keeps track if the context is on this disk's queue, indicating - /// the disk must do some work for this context. 1 means that the context is on the - /// disk queue, 0 means that it's not on the queue (either because it has on ranges - /// active for the disk or because a disk thread dequeued the context and is - /// currently processing a request). - /// - /// The disk needs to do work in 4 cases: + /// the disk must do some work for this context. The disk needs to do work in 4 cases: /// 1) in_flight_ranges is not empty, the disk needs to read for this reader. /// 2) next_range_to_start is NULL, the disk needs to prepare a scan range to be /// read next. @@ -426,15 +349,7 @@ class RequestContext { /// useful that can be done. If there's nothing useful, the disk queue will wake up /// and then remove the reader from the queue. Doing this causes thrashing of the /// threads. - /// - /// This variable is important during context cancellation because it indicates - /// whether a queue has a reference to the context that must be released before - /// the context is considered unregistered. Atomically set to false after - /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window - /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there - /// are no references left to this context. - /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount. - AtomicInt32 is_on_queue_{0}; + bool is_on_queue_ = false; /// For each disks, the number of request ranges that have not been fully read. /// In the non-cancellation path, this will hit 0, and done will be set to true @@ -448,7 +363,7 @@ class RequestContext { /// Queue of pending IO requests for this disk in the order that they will be /// processed. A ScanRange is added to this queue when it is returned in - /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY. + /// GetNextRange(), or when it is added with schedule_immediately = true. /// A WriteRange is added to this queue from unstarted_write_ranges_ for each /// invocation of GetNextRequestRange() in WorkLoop(). /// The size of this queue is always less than or equal to num_remaining_ranges. @@ -464,11 +379,11 @@ class RequestContext { /// range to ready_to_start_ranges_. ScanRange* next_scan_range_to_start_ = nullptr; - /// For each disk, the number of disk threads issuing the underlying read/write on - /// behalf of this context. There are a few places where we release the context lock, - /// do some work, and then grab the lock again. Because we don't hold the lock for - /// the entire operation, we need this ref count to keep track of which thread should - /// do final resource cleanup during cancellation. + /// For each disk, the number of threads issuing the underlying read/write on behalf + /// of this context. There are a few places where we release the context lock, do some + /// work, and then grab the lock again. Because we don't hold the lock for the + /// entire operation, we need this ref count to keep track of which thread should do + /// final resource cleanup during cancellation. /// Only the thread that sees the count at 0 should do the final cleanup. AtomicInt32 num_threads_in_op_{0}; @@ -477,8 +392,7 @@ class RequestContext { /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads /// and writes. (Otherwise, since next_scan_range_to_start is set /// in GetNextRequestRange() whenever it is null, repeated calls to - /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being - /// processed) + /// GetNextRequestRange() and GetNextRange() may result in only reads being processed) InternalQueue<WriteRange> unstarted_write_ranges_; }; http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 0b234ac..222f847 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -23,15 +23,14 @@ #include <boost/thread/mutex.hpp> -#include "common/atomic.h" #include "common/hdfs.h" #include "common/status.h" -#include "runtime/bufferpool/buffer-pool.h" #include "util/condition-variable.h" #include "util/internal-queue.h" -#include "util/mem-range.h" namespace impala { +class MemTracker; + namespace io { class DiskIoMgr; class RequestContext; @@ -56,19 +55,24 @@ class BufferDescriptor { /// Returns the offset within the scan range that this buffer starts at int64_t scan_range_offset() const { return scan_range_offset_; } + /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set + /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not + /// check memory limits on 'dst': the caller should check the memory limit if a + /// different memory limit may apply to 'dst'. If the buffer was a client-provided + /// buffer, transferring is not allowed. + /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp. + void TransferOwnership(MemTracker* dst); + private: friend class DiskIoMgr; friend class ScanRange; friend class RequestContext; - /// Create a buffer descriptor for a new reader, range and data buffer. - BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, - ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len); - - /// Create a buffer descriptor allocated from the buffer pool. + /// Create a buffer descriptor for a new reader, range and data buffer. The buffer + /// memory should already be accounted against 'mem_tracker'. BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, - ScanRange* scan_range, BufferPool::ClientHandle* bp_client, - BufferPool::BufferHandle handle); + ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len, + MemTracker* mem_tracker); /// Return true if this is a cached buffer owned by HDFS. bool is_cached() const; @@ -82,11 +86,14 @@ class BufferDescriptor { /// Reader that this buffer is for. RequestContext* const reader_; + /// The current tracker this buffer is associated with. After initialisation, + /// NULL for cached buffers and non-NULL for all other buffers. + MemTracker* mem_tracker_; + /// Scan range that this buffer is for. Non-NULL when initialised. ScanRange* const scan_range_; - /// Buffer for the read contents. Must be set to NULL in RequestContext::FreeBuffer() - /// before destruction of the descriptor. + /// buffer with the read contents uint8_t* buffer_; /// length of buffer_. For buffers from cached reads, the length is 0. @@ -98,12 +105,10 @@ class BufferDescriptor { /// true if the current scan range is complete bool eosr_ = false; - int64_t scan_range_offset_ = 0; + /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr + Status status_; - // Handle to an allocated buffer and the client used to allocate it buffer. Only used - // for non-external buffers. - BufferPool::ClientHandle* bp_client_ = nullptr; - BufferPool::BufferHandle handle_; + int64_t scan_range_offset_ = 0; }; /// The request type, read or write associated with a request range. @@ -211,11 +216,11 @@ class ScanRange : public RequestRange { /// Resets this scan range object with the scan range description. The scan range /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the - /// local filesystem). The scan range must be non-empty and fall within the file bounds - /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk - /// queue to add the range to. If 'expected_local' is true, a warning is generated if - /// the read did not come from a local disk. 'buffer_opts' specifies buffer management - /// options - see the DiskIoMgr class comment and the BufferOpts comments for details. + /// local filesystem). The scan range must fall within the file bounds (offset >= 0 + /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range + /// to. If 'expected_local' is true, a warning is generated if the read did not + /// come from a local disk. 'buffer_opts' specifies buffer management options - + /// see the DiskIoMgr class comment and the BufferOpts comments for details. /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); @@ -231,17 +236,10 @@ class ScanRange : public RequestRange { /// Only one thread can be in GetNext() at any time. Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; - /// Returns the buffer to the scan range. This must be called for every buffer - /// returned by GetNext(). After calling this, the buffer descriptor is invalid - /// and cannot be accessed. - void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); - - /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads - /// blocked on GetNext(). Status is a non-ok status with the reason the range was - /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or - /// another error if an error was encountered while scanning the range. Status is - /// returned to the any callers of GetNext(). If a thread is currently blocked in - /// GetNext(), it is woken up. + /// Cancel this scan range. This cleans up all queued buffers and + /// wakes up any threads blocked on GetNext(). + /// Status is the reason the range was cancelled. Must not be ok(). + /// Status is returned to the user in GetNext(). void Cancel(const Status& status); /// return a descriptive string for debug. @@ -257,13 +255,18 @@ class ScanRange : public RequestRange { /// Initialize internal fields void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader); - /// Enqueues a ready buffer with valid data for this range. This does not block. + /// Enqueues a buffer for this range. This does not block. + /// Returns true if this scan range has hit the queue capacity, false otherwise. /// The caller passes ownership of buffer to the scan range and it is not /// valid to access buffer after this call. The reader lock must be held by the - /// caller. Returns false if the scan range was cancelled. - bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock, + /// caller. + bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, std::unique_ptr<BufferDescriptor> buffer); + /// Cleanup any queued buffers (i.e. due to cancellation). This cannot + /// be called with any locks taken. + void CleanupQueuedBuffers(); + /// Validates the internal state of this range. lock_ must be taken /// before calling this. bool Validate(); @@ -280,10 +283,6 @@ class ScanRange : public RequestRange { /// exclusive use by this scan range. The scan range is the exclusive owner of the /// file handle, and the file handle is destroyed in Close(). /// All local OS files are opened using normal OS file APIs. - /// - /// If an error is encountered during opening, returns a status describing the error. - /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on - /// success, returns OK. Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; /// Closes the file for this range. This function only modifies state in this range. @@ -291,10 +290,6 @@ class ScanRange : public RequestRange { /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns /// the number of bytes read. The read position in this scan range is updated. - /// - /// If an error is encountered during reading, returns a status describing the error. - /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on - /// success, returns OK. Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) WARN_UNUSED_RESULT; @@ -312,58 +307,6 @@ class ScanRange : public RequestRange { Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, bool* read_succeeded) WARN_UNUSED_RESULT; - /// Add buffers for the range to read data into and schedule the range if blocked. - /// If 'returned' is true, the buffers returned from GetNext() that are being recycled - /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added. - void AddUnusedBuffers( - std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned); - - /// Remove a buffer from 'unused_iomgr_buffers_' and update - /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL. - /// 'lock_' must be held by the caller via 'scan_range_lock'. - std::unique_ptr<BufferDescriptor> GetUnusedBuffer( - const boost::unique_lock<boost::mutex>& scan_range_lock); - - /// Get the next buffer for this scan range for a disk thread to read into. Returns - /// the new buffer if successful. If no buffers are available, marks the range - /// as blocked and returns nullptr. Called must not hold 'lock_'. - std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange(); - - /// Cleans up a buffer that was not returned to the client. - /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor. - /// The caller must hold 'lock_' via 'scan_range_lock'. - /// This function may acquire 'hdfs_lock_' - void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock, - std::unique_ptr<BufferDescriptor> buffer); - - /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not - /// hold 'lock_'. - void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers); - - /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan - /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'. - void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock); - - /// Same as Cancel() except reader_->lock must be held by the caller. - void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, - const Status& status); - - /// Same as Cancel() except doesn't remove the scan range from - /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(), - /// which removes the range itself to avoid invalidating its active_scan_ranges_ - /// iterator. - void CancelInternal(const Status& status); - - /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'. - void SetBlockedOnBuffer(); - - /// Returns true if no more buffers will be returned to clients in the future, - /// either because of hitting eosr or cancellation. - bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty()); - } - /// Pointer to caller specified metadata. This is untouched by the io manager /// and the caller can put whatever auxiliary data in here. void* meta_data_ = nullptr; @@ -380,9 +323,6 @@ class ScanRange : public RequestRange { /// TODO: we can do more with this bool expected_local_ = false; - /// Last modified time of the file associated with the scan range. Set in Reset(). - int64_t mtime_; - /// Total number of bytes read remotely. This is necessary to maintain a count of /// the number of remote scan ranges. Since IO statistics can be collected multiple /// times for a scan range, it is necessary to keep some state about whether this @@ -429,10 +369,6 @@ class ScanRange : public RequestRange { struct hadoopRzBuffer* cached_buffer_ = nullptr; }; - /// The number of buffers that have been returned to a client via GetNext() that have - /// not yet been returned with ReturnBuffer(). - AtomicInt32 num_buffers_in_reader_{0}; - /// Lock protecting fields below. /// This lock should not be taken during Open()/Read()/Close(). /// If RequestContext::lock_ and this lock need to be held simultaneously, @@ -442,40 +378,25 @@ class ScanRange : public RequestRange { /// Number of bytes read so far for this scan range int bytes_read_; - /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are - /// initially populated when the client calls AllocateBuffersForRange() and - /// and are used to read scanned data into. Buffers are taken from this vector for - /// every read and added back, if needed, when the client calls ReturnBuffer(). - std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_; - - /// Total number of bytes of buffers in 'unused_iomgr_buffers_'. - int64_t unused_iomgr_buffer_bytes_ = 0; - - /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to - /// infer how many bytes of buffers need to be held onto to read the rest of the scan - /// range. - int64_t iomgr_buffer_bytes_returned_ = 0; + /// Status for this range. This is non-ok if is_cancelled_ is true. + /// Note: an individual range can fail without the RequestContext being + /// cancelled. This allows us to skip individual ranges. + Status status_; /// If true, the last buffer for this scan range has been queued. - /// If this is true and 'ready_buffers_' is empty, then no more buffers will be - /// returned to the caller by this scan range. bool eosr_queued_ = false; - /// If true, this scan range is not scheduled because a buffer is not available for - /// the next I/O in the range. This can happen when the scan range is initially created - /// or if the buffers added to the range have all been filled with data an not yet - /// returned. - bool blocked_on_buffer_ = false; + /// If true, the last buffer for this scan range has been returned. + bool eosr_returned_ = false; - /// IO buffers that are queued for this scan range. When Cancel() is called - /// this is drained by the cancelling thread. I.e. this is always empty if - /// 'cancel_status_' is not OK. - std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; + /// If true, this scan range has been removed from the reader's in_flight_ranges + /// queue because the ready_buffers_ queue is full. + bool blocked_on_queue_ = false; - /// Condition variable for threads in GetNext() that are waiting for the next buffer. - /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is - /// cancelled. + /// IO buffers that are queued for this scan range. + /// Condition variable for GetNext ConditionVariable buffer_ready_cv_; + std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; /// Lock that should be taken during hdfs calls. Only one thread (the disk reading /// thread) calls into hdfs at a time so this lock does not have performance impact. @@ -485,16 +406,11 @@ class ScanRange : public RequestRange { /// If this lock and lock_ need to be taken, lock_ must be taken first. boost::mutex hdfs_lock_; - /// If non-OK, this scan range has been cancelled. This status is the reason for - /// cancellation - CANCELLED if cancelled without error, or another status if an - /// error caused cancellation. Note that a range can be cancelled without cancelling - /// the owning context. This means that ranges can be cancelled or hit errors without - /// aborting all scan ranges. - // - /// Writers must hold both 'lock_' and 'hdfs_lock_'. Readers must hold either 'lock_' - /// or 'hdfs_lock_'. This prevents the range from being cancelled while any thread - /// is inside a critical section. - Status cancel_status_; + /// If true, this scan range has been cancelled. + bool is_cancelled_ = false; + + /// Last modified time of the file associated with the scan range + int64_t mtime_; }; /// Used to specify data to be written to a file and offset. http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 4f7c38b..bd89846 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -42,27 +42,35 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u // any time and only one thread will remove from the queue. This is to guarantee // that buffers are queued and read in file order. -bool ScanRange::EnqueueReadyBuffer( +bool ScanRange::EnqueueBuffer( const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); - DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer"; { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); + DCHECK(!eosr_returned_); DCHECK(!eosr_queued_); - if (!cancel_status_.ok()) { - // This range has been cancelled, no need to enqueue the buffer. - CleanUpBuffer(scan_range_lock, move(buffer)); + if (is_cancelled_) { + // Return the buffer, this range has been cancelled + if (buffer->buffer_ != nullptr) { + io_mgr_->num_buffers_in_readers_.Add(1); + reader_->num_buffers_in_reader_.Add(1); + } + reader_->num_used_buffers_.Add(-1); + io_mgr_->ReturnBuffer(move(buffer)); return false; } - // Clean up any surplus buffers. E.g. we may have allocated too many if the file was - // shorter than expected. - if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock); + reader_->num_ready_buffers_.Add(1); eosr_queued_ = buffer->eosr(); ready_buffers_.emplace_back(move(buffer)); + + DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); + blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; } + buffer_ready_cv_.NotifyOne(); - return true; + + return blocked_on_queue_; } Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { @@ -70,225 +78,123 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { bool eosr; { unique_lock<mutex> scan_range_lock(lock_); + if (eosr_returned_) return Status::OK(); DCHECK(Validate()) << DebugString(); - // No more buffers to return - return the cancel status or OK if not cancelled. - if (all_buffers_returned(scan_range_lock)) return cancel_status_; - while (ready_buffers_.empty() && cancel_status_.ok()) { + while (ready_buffers_.empty() && !is_cancelled_) { buffer_ready_cv_.Wait(scan_range_lock); } - /// Propagate cancellation to the client if it happened while we were waiting. - RETURN_IF_ERROR(cancel_status_); + + if (is_cancelled_) { + DCHECK(!status_.ok()); + return status_; + } // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty()); + DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); *buffer = move(ready_buffers_.front()); ready_buffers_.pop_front(); + eosr_returned_ = (*buffer)->eosr(); eosr = (*buffer)->eosr(); - DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString(); - } - - // Update tracking counters. The buffer has now moved from the IoMgr to the caller. - if (eosr) reader_->RemoveActiveScanRange(this); - num_buffers_in_reader_.Add(1); - return Status::OK(); -} - -void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { - vector<unique_ptr<BufferDescriptor>> buffers; - buffers.emplace_back(move(buffer_desc)); - AddUnusedBuffers(move(buffers), true); -} - -void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers, - bool returned) { - DCHECK_GT(buffers.size(), 0); - /// Keep track of whether the range was unblocked in this function. If so, we need - /// to schedule it so it resumes progress. - bool unblocked = false; - { - unique_lock<mutex> scan_range_lock(lock_); - if (returned) { - // Buffers were in reader but now aren't. - num_buffers_in_reader_.Add(-buffers.size()); - } - - for (unique_ptr<BufferDescriptor>& buffer : buffers) { - // We should not hold onto the buffers in the following cases: - // 1. the scan range is using external buffers, e.g. cached buffers. - // 2. the scan range is cancelled - // 3. the scan range already hit eosr - // 4. we already have enough buffers to read the remainder of the scan range. - if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER - || !cancel_status_.ok() - || eosr_queued_ - || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) { - CleanUpBuffer(scan_range_lock, move(buffer)); - } else { - unused_iomgr_buffer_bytes_ += buffer->buffer_len(); - unused_iomgr_buffers_.emplace_back(move(buffer)); - if (blocked_on_buffer_) { - blocked_on_buffer_ = false; - unblocked = true; - } - } - } - } - // Must drop the ScanRange lock before acquiring the RequestContext lock. - if (unblocked) { - unique_lock<mutex> reader_lock(reader_->lock_); - // Reader may have been cancelled after we dropped 'scan_range_lock' above. - if (reader_->state_ == RequestContext::Cancelled) { - DCHECK(!cancel_status_.ok()); - } else { - reader_->ScheduleScanRange(reader_lock, this); - } } -} - -unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer( - const unique_lock<mutex>& scan_range_lock) { - DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock()); - if (unused_iomgr_buffers_.empty()) return nullptr; - unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back()); - unused_iomgr_buffers_.pop_back(); - unused_iomgr_buffer_bytes_ -= result->buffer_len(); - return result; -} -unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() { - unique_lock<mutex> lock(lock_); - unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock); - if (buffer_desc == nullptr) { - blocked_on_buffer_ = true; - } else { - iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len(); + // Update tracking counters. The buffer has now moved from the IoMgr to the + // caller. + io_mgr_->num_buffers_in_readers_.Add(1); + reader_->num_buffers_in_reader_.Add(1); + reader_->num_ready_buffers_.Add(-1); + reader_->num_used_buffers_.Add(-1); + if (eosr) reader_->num_finished_ranges_.Add(1); + + Status status = (*buffer)->status_; + if (!status.ok()) { + io_mgr_->ReturnBuffer(move(*buffer)); + return status; } - return buffer_desc; -} -void ScanRange::SetBlockedOnBuffer() { - unique_lock<mutex> lock(lock_); - blocked_on_buffer_ = true; -} + unique_lock<mutex> reader_lock(reader_->lock_); -void ScanRange::CleanUpBuffer( - const boost::unique_lock<boost::mutex>& scan_range_lock, - unique_ptr<BufferDescriptor> buffer_desc) { - DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock()); - DCHECK(buffer_desc != nullptr); - DCHECK_EQ(this, buffer_desc->scan_range_); - buffer_desc->reader_->FreeBuffer(buffer_desc.get()); - - if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) { - // Close the scan range if there are no more buffers in the reader and no more buffers - // will be returned to readers in future. Close() is idempotent so it is ok to call - // multiple times during cleanup so long as the range is actually finished. - Close(); + DCHECK(reader_->Validate()) << endl << reader_->DebugString(); + if (reader_->state_ == RequestContext::Cancelled) { + reader_->blocked_ranges_.Remove(this); + Cancel(reader_->status_); + io_mgr_->ReturnBuffer(move(*buffer)); + return status_; } -} - -void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) { - unique_lock<mutex> lock(lock_); - for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer)); -} -void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) { - while (!unused_iomgr_buffers_.empty()) { - CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock)); + { + // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer() + // may have been called after we released 'lock_' above so we need to re-check + // whether the queue is full. + unique_lock<mutex> scan_range_lock(lock_); + if (blocked_on_queue_ + && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT + && !eosr_queued_) { + blocked_on_queue_ = false; + // This scan range was blocked and is no longer, add it to the reader + // queue again. + reader_->blocked_ranges_.Remove(this); + reader_->ScheduleScanRange(this); + } } + return Status::OK(); } void ScanRange::Cancel(const Status& status) { // Cancelling a range that was never started, ignore. if (io_mgr_ == nullptr) return; - CancelInternal(status); - reader_->RemoveActiveScanRange(this); -} -void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, - const Status& status) { - DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); - CancelInternal(status); - reader_->RemoveActiveScanRangeLocked(reader_lock, this); -} - -void ScanRange::CancelInternal(const Status& status) { - DCHECK(io_mgr_ != nullptr); DCHECK(!status.ok()); { - // Grab both locks to make sure that we don't change 'cancel_status_' while other - // threads are in critical sections. + // Grab both locks to make sure that all working threads see is_cancelled_. unique_lock<mutex> scan_range_lock(lock_); - { - unique_lock<mutex> hdfs_lock(hdfs_lock_); - DCHECK(Validate()) << DebugString(); - // If already cancelled, preserve the original reason for cancellation. The first - // thread to set 'cancel_status_' does the cleanup below. - RETURN_VOID_IF_ERROR(cancel_status_); - cancel_status_ = status; - } - - /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads - /// from seeing inconsistent state. - while (!ready_buffers_.empty()) { - CleanUpBuffer(scan_range_lock, move(ready_buffers_.front())); - ready_buffers_.pop_front(); - } - - /// Clean up buffers that we don't need any more because we won't read any more data. - CleanUpUnusedBuffers(scan_range_lock); + unique_lock<mutex> hdfs_lock(hdfs_lock_); + DCHECK(Validate()) << DebugString(); + if (is_cancelled_) return; + is_cancelled_ = true; + status_ = status; } buffer_ready_cv_.NotifyAll(); + CleanupQueuedBuffers(); // For cached buffers, we can't close the range until the cached buffer is returned. - // Close() is called from ScanRange::CleanUpBufferLocked(). + // Close() is called from DiskIoMgr::ReturnBuffer(). if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close(); } +void ScanRange::CleanupQueuedBuffers() { + DCHECK(is_cancelled_); + io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size()); + reader_->num_buffers_in_reader_.Add(ready_buffers_.size()); + reader_->num_used_buffers_.Add(-ready_buffers_.size()); + reader_->num_ready_buffers_.Add(-ready_buffers_.size()); + + while (!ready_buffers_.empty()) { + io_mgr_->ReturnBuffer(move(ready_buffers_.front())); + ready_buffers_.pop_front(); + } +} + string ScanRange::DebugString() const { stringstream ss; ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_ << " len=" << len_ << " bytes_read=" << bytes_read_ - << " cancel_status=" << cancel_status_.GetDetail() << " buffer_queue=" << ready_buffers_.size() - << " num_buffers_in_readers=" << num_buffers_in_reader_.Load() - << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size() - << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_ - << " blocked_on_buffer=" << blocked_on_buffer_ << " hdfs_file=" << exclusive_hdfs_fh_; return ss.str(); } bool ScanRange::Validate() { if (bytes_read_ > len_) { - LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range." + LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range." << " bytes_read_=" << bytes_read_ << " len_=" << len_; return false; } - if (!cancel_status_.ok() && !ready_buffers_.empty()) { - LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString(); - return false; - } - int64_t unused_iomgr_buffer_bytes = 0; - for (auto& buffer : unused_iomgr_buffers_) - unused_iomgr_buffer_bytes += buffer->buffer_len(); - if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) { - LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: " - << unused_iomgr_buffer_bytes_ - << " vs. expected: " << unused_iomgr_buffer_bytes; - return false; - } - bool is_finished = !cancel_status_.ok() || eosr_queued_; - if (is_finished && !unused_iomgr_buffers_.empty()) { - LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size() - << " bytes: " << unused_iomgr_buffer_bytes_ - << " cancel_status: " << cancel_status_.GetDetail() - << " eosr_queued: " << eosr_queued_; - return false; - } - if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) { - LOG(ERROR) << "Blocked despite having buffers: " << DebugString(); + if (eosr_returned_ && !eosr_queued_) { + LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range" + << " eosr_returned_=" << eosr_returned_ + << " eosr_queued_=" << eosr_queued_; return false; } return true; @@ -297,14 +203,13 @@ bool ScanRange::Validate() { ScanRange::ScanRange() : RequestRange(RequestType::READ), num_remote_bytes_(0), - external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {} + external_buffer_tag_(ExternalBufferTag::NO_BUFFER), + mtime_(-1) {} ScanRange::~ScanRange() { DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed."; DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) << "Cached buffer was not released."; - DCHECK_EQ(0, ready_buffers_.size()); - DCHECK_EQ(0, num_buffers_in_reader_.Load()); } void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, @@ -340,22 +245,24 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { DCHECK(exclusive_hdfs_fh_ == nullptr); DCHECK(local_file_ == nullptr); + // Reader must provide MemTracker or a buffer. + DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER + || reader->mem_tracker_ != nullptr); io_mgr_ = io_mgr; reader_ = reader; local_file_ = nullptr; exclusive_hdfs_fh_ = nullptr; bytes_read_ = 0; - unused_iomgr_buffer_bytes_ = 0; - iomgr_buffer_bytes_returned_ = 0; - cancel_status_ = Status::OK(); - eosr_queued_ = false; - blocked_on_buffer_ = false; + is_cancelled_ = false; + eosr_queued_= false; + eosr_returned_= false; + blocked_on_queue_ = false; DCHECK(Validate()) << DebugString(); } Status ScanRange::Open(bool use_file_handle_cache) { unique_lock<mutex> hdfs_lock(hdfs_lock_); - RETURN_IF_ERROR(cancel_status_); + if (is_cancelled_) return Status::CANCELLED; if (fs_ != nullptr) { if (exclusive_hdfs_fh_ != nullptr) return Status::OK(); @@ -395,7 +302,9 @@ Status ScanRange::Open(bool use_file_handle_cache) { "for file: $1: $2", offset_, file_, GetStrErrMsg())); } } - ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L); + if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { + ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L); + } return Status::OK(); } @@ -447,7 +356,9 @@ void ScanRange::Close() { local_file_ = nullptr; closed_file = true; } - if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L); + if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { + ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L); + } } int64_t ScanRange::MaxReadChunkSize() const { @@ -475,7 +386,7 @@ int64_t ScanRange::MaxReadChunkSize() const { Status ScanRange::Read( uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) { unique_lock<mutex> hdfs_lock(hdfs_lock_); - RETURN_IF_ERROR(cancel_status_); + if (is_cancelled_) return Status::CANCELLED; *eosr = false; *bytes_read = 0; @@ -622,7 +533,7 @@ Status ScanRange::ReadFromCache( { unique_lock<mutex> hdfs_lock(hdfs_lock_); - RETURN_IF_ERROR(cancel_status_); + if (is_cancelled_) return Status::CANCELLED; DCHECK(exclusive_hdfs_fh_ != nullptr); DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER); @@ -657,16 +568,20 @@ Status ScanRange::ReadFromCache( } // Create a single buffer desc for the entire scan range and enqueue that. - // The memory is owned by the HDFS java client, not the Impala backend. + // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client, + // not the Impala backend. unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( - io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0)); + io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr)); desc->len_ = bytes_read; desc->scan_range_offset_ = 0; desc->eosr_ = true; bytes_read_ = bytes_read; - EnqueueReadyBuffer(reader_lock, move(desc)); - COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read); + EnqueueBuffer(reader_lock, move(desc)); + if (reader_->bytes_read_counter_ != nullptr) { + COUNTER_ADD(reader_->bytes_read_counter_, bytes_read); + } *read_succeeded = true; + reader_->num_used_buffers_.Add(1); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 8d28f8f..10a3424 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -107,6 +107,7 @@ class MemTracker { /// destruction to prevent other threads from getting a reference to the MemTracker /// via its parent. Only used to deregister the query-level MemTracker from the /// global hierarchy. + /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be deleted. void CloseAndUnregisterFromParent(); /// Include counters from a ReservationTracker in logs and other diagnostics. http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 0d6c6f0..770eaba 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -49,7 +49,7 @@ Status TestEnv::Init() { exec_env_.reset(new ExecEnv); // Populate the ExecEnv state that the backend tests need. exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process")); - RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init()); + RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker())); exec_env_->metrics_.reset(new MetricGroup("test-env-metrics")); exec_env_->tmp_file_mgr_.reset(new TmpFileMgr); if (have_tmp_file_mgr_args_) { http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 9161b63..3091c58 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -25,7 +25,6 @@ #include <gtest/gtest.h> #include "common/init.h" -#include "runtime/io/request-context.h" #include "runtime/test-env.h" #include "runtime/tmp-file-mgr-internal.h" #include "runtime/tmp-file-mgr.h" @@ -135,7 +134,7 @@ class TmpFileMgrTest : public ::testing::Test { /// Helper to cancel the FileGroup RequestContext. static void CancelIoContext(TmpFileMgr::FileGroup* group) { - group->io_ctx_->Cancel(); + group->io_mgr_->CancelContext(group->io_ctx_.get()); } /// Helper to get the # of bytes allocated by the group. Validates that the sum across http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index e0c58d4..3807670 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -243,7 +243,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, next_allocation_index_(0), free_ranges_(64) { DCHECK(tmp_file_mgr != nullptr); - io_ctx_ = io_mgr_->RegisterContext(); + io_ctx_ = io_mgr_->RegisterContext(nullptr); } TmpFileMgr::FileGroup::~FileGroup() { @@ -396,10 +396,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { BufferOpts::ReadInto(buffer.data(), buffer.len())); read_counter_->Add(1); bytes_read_counter_->Add(buffer.len()); - bool needs_buffers; - RETURN_IF_ERROR(io_mgr_->StartScanRange( - io_ctx_.get(), handle->read_range_, &needs_buffers)); - DCHECK(!needs_buffers) << "Already provided a buffer"; + RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true)); return Status::OK(); } @@ -429,7 +426,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf } exit: // Always return the buffer before exiting to avoid leaking it. - if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer)); + if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer)); handle->read_range_ = nullptr; return status; } @@ -525,20 +522,11 @@ TmpFileMgr::WriteHandle::WriteHandle( is_cancelled_(false), write_in_flight_(false) {} -TmpFileMgr::WriteHandle::~WriteHandle() { - DCHECK(!write_in_flight_); - DCHECK(read_range_ == nullptr); -} - string TmpFileMgr::WriteHandle::TmpFilePath() const { if (file_ == nullptr) return ""; return file_->path(); } -int64_t TmpFileMgr::WriteHandle::len() const { - return write_range_->len(); -} - Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, WriteRange::WriteDoneCallback callback) { http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 55901f4..95072ae 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -28,6 +28,7 @@ #include "common/object-pool.h" #include "common/status.h" #include "gen-cpp/Types_types.h" // for TUniqueId +#include "runtime/io/request-ranges.h" #include "util/collection-metrics.h" #include "util/condition-variable.h" #include "util/mem-range.h" @@ -36,12 +37,6 @@ #include "util/spinlock.h" namespace impala { -namespace io { - class DiskIoMgr; - class RequestContext; - class ScanRange; - class WriteRange; -} /// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files /// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch @@ -89,7 +84,6 @@ class TmpFileMgr { /// Needs to be public for TmpFileMgrTest. typedef int DeviceId; - /// Same typedef as io::WriteRange::WriteDoneCallback. typedef std::function<void(const Status&)> WriteDoneCallback; /// Represents a group of temporary files - one per disk with a scratch directory. The @@ -283,7 +277,10 @@ class TmpFileMgr { public: /// The write must be destroyed by passing it to FileGroup - destroying it before /// the write completes is an error. - ~WriteHandle(); + ~WriteHandle() { + DCHECK(!write_in_flight_); + DCHECK(read_range_ == nullptr); + } /// Cancel any in-flight read synchronously. void CancelRead(); @@ -293,7 +290,7 @@ class TmpFileMgr { std::string TmpFilePath() const; /// The length of the write range in bytes. - int64_t len() const; + int64_t len() const { return write_range_->len(); } std::string DebugString(); @@ -308,7 +305,7 @@ class TmpFileMgr { /// failure and 'is_cancelled_' is set to true on failure. Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, - WriteDoneCallback callback) WARN_UNUSED_RESULT; + io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT; /// Retry the write after the initial write failed with an error, instead writing to /// 'offset' of 'file'. 'write_in_flight_' must be true before calling. http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc index 6f70727..5f8d443 100644 --- a/be/src/util/bit-util-test.cc +++ b/be/src/util/bit-util-test.cc @@ -258,17 +258,6 @@ TEST(BitUtil, Log2) { EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64); } -TEST(BitUtil, RoundToPowerOfTwo) { - EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9)); - EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15)); - EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16)); - EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17)); - EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9)); - EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15)); - EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16)); - EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17)); -} - TEST(BitUtil, RoundUpToPowerOf2) { EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8); EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8); http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/bit-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h index 8a65509..5c9a29b 100644 --- a/be/src/util/bit-util.h +++ b/be/src/util/bit-util.h @@ -98,12 +98,6 @@ class BitUtil { return v; } - /// Returns the largest power of two <= v. - static inline int64_t RoundDownToPowerOfTwo(int64_t v) { - int64_t v_rounded_up = RoundUpToPowerOfTwo(v); - return v_rounded_up == v ? v : v_rounded_up / 2; - } - /// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is /// a power of two static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) { @@ -111,7 +105,7 @@ class BitUtil { return (value + (factor - 1)) & ~(factor - 1); } - static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) { + static inline int RoundDownToPowerOf2(int value, int factor) { DCHECK((factor > 0) && ((factor & (factor - 1)) == 0)); return value & ~(factor - 1); } http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/impalad-metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 815e4af..c2a2644 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -46,6 +46,12 @@ const char* ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES = "impala-server.hash-table.total-bytes"; const char* ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES = "impala-server.io-mgr.num-open-files"; +const char* ImpaladMetricKeys::IO_MGR_NUM_BUFFERS = + "impala-server.io-mgr.num-buffers"; +const char* ImpaladMetricKeys::IO_MGR_TOTAL_BYTES = + "impala-server.io-mgr.total-bytes"; +const char* ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS = + "impala-server.io-mgr.num-unused-buffers"; const char* ImpaladMetricKeys::IO_MGR_BYTES_READ = "impala-server.io-mgr.bytes-read"; const char* ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ = @@ -205,8 +211,11 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0); // Initialize IO mgr metrics - IO_MGR_NUM_OPEN_FILES = m->AddGauge( - ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0); + IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0); + IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0); + IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0); + IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge( + ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0); IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge( ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0); IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge( http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/util/impalad-metrics.h ---------------------------------------------------------------------- diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index 7de7aa8..b49caf9 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -67,6 +67,15 @@ class ImpaladMetricKeys { /// Number of files currently opened by the io mgr static const char* IO_MGR_NUM_OPEN_FILES; + /// Number of IO buffers allocated by the io mgr + static const char* IO_MGR_NUM_BUFFERS; + + /// Number of bytes used by IO buffers (used and unused). + static const char* IO_MGR_TOTAL_BYTES; + + /// Number of IO buffers that are currently unused (and can be GC'ed) + static const char* IO_MGR_NUM_UNUSED_BUFFERS; + /// Total number of bytes read by the io mgr static const char* IO_MGR_BYTES_READ; http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 1ab05a0..c5df1cd 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -239,9 +239,6 @@ struct THdfsScanNode { // The byte offset of the slot for Parquet metadata if Parquet count star optimization // is enabled. 10: optional i32 parquet_count_star_slot_offset - - // The ideal memory reservation in bytes to process an input split. - 11: optional i64 ideal_scan_range_reservation } struct TDataSourceScanNode { http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java index aae3863..4f0a0e1 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java @@ -163,25 +163,6 @@ public class SlotDescriptor { } /** - * Checks if this descriptor describes an array "pos" pseudo-column. - * - * Note: checking whether the column is null distinguishes between top-level columns - * and nested types. This check more specifically looks just for a reference to the - * "pos" field of an array type. - */ - public boolean isArrayPosRef() { - if (parent_ == null) return false; - Type parentType = parent_.getType(); - if (parentType instanceof CollectionStructType) { - if (((CollectionStructType)parentType).isArrayStruct() && - label_.equals(Path.ARRAY_POS_FIELD_NAME)) { - return true; - } - } - return false; - } - - /** * Assembles the absolute materialized path to this slot starting from the schema * root. The materialized path points to the first non-struct schema element along the * path starting from the parent's tuple path to this slot's path.