Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1534383&r1=1534382&r2=1534383&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Mon Oct 21 21:26:10 2013 @@ -37,7 +37,6 @@ namespace qls_jrnl #include "qpid/linearstore/jrnl/deq_rec.h" #include "qpid/linearstore/jrnl/enq_map.h" #include "qpid/linearstore/jrnl/enq_rec.h" -//#include "qpid/linearstore/jrnl/fcntl.h" #include "qpid/linearstore/jrnl/txn_map.h" #include "qpid/linearstore/jrnl/txn_rec.h" @@ -61,8 +60,7 @@ class JournalFile; { UNUSED, ///< A page is uninitialized, contains no data. IN_USE, ///< Page is in use. - AIO_PENDING, ///< An AIO request outstanding. - AIO_COMPLETE ///< An AIO request is complete. + AIO_PENDING ///< An AIO request outstanding. }; /** @@ -77,8 +75,6 @@ class JournalFile; uint32_t _wdblks; ///< Total number of dblks in page so far uint32_t _rdblks; ///< Total number of dblks in page std::deque<data_tok*>* _pdtokl; ///< Page message tokens list - //fcntl* _wfh; ///< File handle for incrementing write compl counts - //fcntl* _rfh; ///< File handle for incrementing read compl counts JournalFile* _jfp; ///< Journal file for incrementing compl counts void* _pbuff; ///< Page buffer @@ -113,7 +109,7 @@ class JournalFile; pmgr(jcntl* jc, enq_map& emap, txn_map& tmap); virtual ~pmgr(); - virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0; + virtual int32_t get_events(timespec* const timeout, bool flush) = 0; inline uint32_t get_aio_evt_rem() const { return _aio_evt_rem; } static const char* page_state_str(page_state ps); inline uint32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Mon Oct 21 21:26:10 2013 @@ -94,8 +94,8 @@ txn_rec::encode(void* wptr, uint32_t rec assert(max_size_dblks > 0); assert(_xidp != 0 && _txn_hdr._xidsize > 0); - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; - std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES; + std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; + std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages) { @@ -145,10 +145,10 @@ txn_rec::encode(void* wptr, uint32_t rec { std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize); wr_cnt += wsize; -#ifdef RHM_CLEAN - std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; - std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES; - std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); +#ifdef QLS_CLEAN + std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; + std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES; + std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } rec_offs -= sizeof(_txn_tail) - wsize; @@ -186,9 +186,9 @@ txn_rec::encode(void* wptr, uint32_t rec wr_cnt += _txn_hdr._xidsize; std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail)); wr_cnt += sizeof(_txn_tail); -#ifdef RHM_CLEAN - std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES; - std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); +#ifdef QLS_CLEAN + std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES; + std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt); #endif } } @@ -206,7 +206,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr { const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize); const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize + sizeof(rec_tail_t)); - const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES; + const std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) { @@ -239,7 +239,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs; std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); rd_cnt += xid_rem; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem); @@ -249,7 +249,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr else { // Remainder of xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES); + const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES); std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size); rd_cnt += xid_cp_size; } @@ -288,7 +288,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr // Entire header and xid fit within this page, tail split std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize); rd_cnt += _txn_hdr._xidsize; - const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; + const std::size_t tail_rem = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; if (tail_rem) { std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem); @@ -298,7 +298,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr else { // Header fits within this page, xid split - const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt; + const std::size_t xid_cp_size = (max_size_dblks * QLS_DBLK_SIZE_BYTES) - rd_cnt; std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size); rd_cnt += xid_cp_size; } @@ -357,7 +357,7 @@ txn_rec::rcv_decode(rec_hdr_t h, std::if return false; } } - ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size()); + ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); chk_tail(); // Throws if tail invalid or record incomplete assert(!ifsp->fail() && !ifsp->bad()); return true; Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1534383&r1=1534382&r2=1534383&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Oct 21 21:26:10 2013 @@ -73,7 +73,6 @@ void file_hdr_reset(file_hdr_t* target) target->_ts_nsec = 0; target->_file_number = 0; target->_queue_name_len = 0; - memset(target + sizeof(file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(file_hdr_t)); } int is_file_hdr_reset(file_hdr_t* target) { Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1534383&r1=1534382&r2=1534383&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Oct 21 21:26:10 2013 @@ -26,20 +26,24 @@ #include <cstdlib> #include <cstring> #include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include "qpid/linearstore/jrnl/jcfg.h" #include "qpid/linearstore/jrnl/jcntl.h" #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/JournalFile.h" #include <sstream> #include <stdint.h> -//#include <iostream> // DEBUG +#include <iostream> // DEBUG namespace qpid { namespace qls_jrnl { -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc): +wmgr::wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc): pmgr(jc, emap, tmap), _lfc(lfc), _max_dtokpp(0), @@ -52,8 +56,13 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn _txn_pending_set() {} -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us): - pmgr(jc, emap, tmap /* , dtoklp */), +wmgr::wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us): + pmgr(jc, emap, tmap), _lfc(lfc), _max_dtokpp(max_dtokpp), _max_io_wait_us(max_iowait_us), @@ -71,9 +80,12 @@ wmgr::~wmgr() } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages, const uint32_t max_dtokpp, const uint32_t max_iowait_us, - std::size_t eo) +wmgr::initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us, + std::size_t eo) { _enq_busy = false; _deq_busy = false; @@ -86,26 +98,38 @@ wmgr::initialize(aio_callback* const cbp if (eo) { - const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS; - uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr + const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS; + uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); } } iores -wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, - const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr, - const std::size_t xid_len, const bool transient, const bool external) +wmgr::enqueue(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool transient, + const bool external) { if (xid_len) assert(xid_ptr != 0); - if (_deq_busy || _abort_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_deq_busy || _abort_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: enqueue while part way through another op:"; + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } - if (this_data_len != tot_data_len && !external) - return RHM_IORES_NOTIMPL; + if (this_data_len != tot_data_len && !external) { + throw jexception("RHM_IORES_NOTIMPL: partial enqueues not implemented"); // TODO: complete exception; + } iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external); if (res != RHM_IORES_SUCCESS) @@ -124,9 +148,8 @@ wmgr::enqueue(const void* const data_buf } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/ - _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient, - external); + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); + _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external); if (!cont) { dtokp->set_rid(rid); @@ -137,14 +160,16 @@ wmgr::enqueue(const void* const data_buf dtokp->clear_xid(); _enq_busy = true; } +//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); +//std::cout << "*" << std::flush; // DEBUG + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -159,6 +184,7 @@ wmgr::enqueue(const void* const data_buf // Is the encoding of this record complete? if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks()) { +//std::cout << "!" << std::flush; // DEBUG // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. dtokp->set_wstate(data_tok::ENQ_SUBM); dtokp->set_dsize(tot_data_len); @@ -166,7 +192,8 @@ wmgr::enqueue(const void* const data_buf // long multi-page messages have their token on the page containing the END of the // message. AIO callbacks will then only process this token when entire message is // enqueued. - _lfc.incrEnqueuedRecordCount(); + _lfc.incrEnqueuedRecordCount(dtokp->fid()); +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG if (xid_len) // If part of transaction, add to transaction map { @@ -185,26 +212,37 @@ wmgr::enqueue(const void* const data_buf } done = true; - } - else + } else { +//std::cout << "$" << std::endl << std::flush; // DEBUG dtokp->set_wstate(data_tok::ENQ_PART); + } file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::ENQ_SUBM) _enq_busy = false; +//std::cout << " res=" << iores_str(res) << " _enq_busy=" << (_enq_busy?"T":"F") << std::endl << std::flush; // DEBUG return res; } iores -wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit) +wmgr::dequeue(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool txn_coml_commit) { if (xid_len) assert(xid_ptr != 0); - if (_enq_busy || _abort_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _abort_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: dequeue while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_DEQUEUE, dtokp); if (res != RHM_IORES_SUCCESS) @@ -224,7 +262,7 @@ wmgr::dequeue(data_tok* dtokp, const voi } const bool ext_rid = dtokp->external_rid(); - uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId(); uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid(); _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit); if (!cont) @@ -242,14 +280,16 @@ wmgr::dequeue(data_tok* dtokp, const voi dtokp->set_dblocks_written(0); // Reset dblks_written from previous op _deq_busy = true; } +//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); +//std::cout << "*" << std::flush; // DEBUG + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -264,6 +304,7 @@ wmgr::dequeue(data_tok* dtokp, const voi // Is the encoding of this record complete? if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks()) { +//std::cout << "!" << std::flush; // DEBUG // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. dtokp->set_wstate(data_tok::DEQ_SUBM); @@ -276,7 +317,7 @@ wmgr::dequeue(data_tok* dtokp, const voi } else { - int16_t fid; + uint64_t fid; short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid); if (eres < enq_map::EMAP_OK) // fail { @@ -293,30 +334,43 @@ wmgr::dequeue(data_tok* dtokp, const voi throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } - _lfc.decrEnqueuedRecordCount(); +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG +//try { + _lfc.decrEnqueuedRecordCount(fid); +//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; } } done = true; - } - else + } else { +//std::cout << "$" << std::flush; // DEBUG dtokp->set_wstate(data_tok::DEQ_PART); + } file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::DEQ_SUBM) _deq_busy = false; +//std::cout << " res=" << iores_str(res) << " _deq_busy=" << (_deq_busy?"T":"F") << std::endl << std::flush; // DEBUG return res; } iores -wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +wmgr::abort(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len) { // commit and abort MUST have a valid xid assert(xid_ptr != 0 && xid_len > 0); - if (_enq_busy || _deq_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _deq_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: abort while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_ABORT, dtokp); if (res != RHM_IORES_SUCCESS) @@ -348,11 +402,11 @@ wmgr::abort(data_tok* dtokp, const void* bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -392,7 +446,7 @@ wmgr::abort(data_tok* dtokp, const void* dtokp->set_wstate(data_tok::ABORT_PART); file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::ABORT_SUBM) _abort_busy = false; @@ -400,13 +454,21 @@ wmgr::abort(data_tok* dtokp, const void* } iores -wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +wmgr::commit(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len) { // commit and abort MUST have a valid xid assert(xid_ptr != 0 && xid_len > 0); - if (_enq_busy || _deq_busy || _abort_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _deq_busy || _abort_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: commit while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_COMMIT, dtokp); if (res != RHM_IORES_SUCCESS) @@ -438,11 +500,11 @@ wmgr::commit(data_tok* dtokp, const void bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -475,7 +537,7 @@ wmgr::commit(data_tok* dtokp, const void } else // txn dequeue { - int16_t fid; + uint64_t fid; short eres = _emap.get_remove_pfid(itr->_drid, fid, true); if (eres < enq_map::EMAP_OK) // fail { @@ -509,7 +571,7 @@ wmgr::commit(data_tok* dtokp, const void dtokp->set_wstate(data_tok::COMMIT_PART); file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::COMMIT_SUBM) _commit_busy = false; @@ -517,29 +579,34 @@ wmgr::commit(data_tok* dtokp, const void } void -wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem) +wmgr::file_header_check(const uint64_t rid, + const bool cont, + const uint32_t rec_dblks_rem) { if (_lfc.isEmpty()) // File never written (i.e. no header or data) { std::size_t fro = 0; if (cont) { - bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file - bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file + bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file + bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will exactly fill this journal file if (file_fit && !file_full) { - fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES; + fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS)) * QLS_DBLK_SIZE_BYTES; } } else { - fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES; + fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; } _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro); + _aio_evt_rem++; } } void -wmgr::flush_check(iores& res, bool& cont, bool& done) +wmgr::flush_check(iores& res, + bool& cont, + bool& done, const uint64_t /*rid*/) // DEBUG { // Is page is full, flush - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) + if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -558,6 +625,7 @@ wmgr::flush_check(iores& res, bool& cont if (!done) { cont = true; } +//std::cout << "***** wmgr::flush_check(): GET NEXT FILE: rid=0x" << std::hex << rid << std::dec << " res=" << iores_str(res) << " cont=" << (cont?"T":"F") << " done=" << (done?"T":"F") << std::endl; // DEBUG } } } @@ -580,28 +648,28 @@ wmgr::write_flush() // Don't bother flushing an empty page or one that is still in state AIO_PENDING if (_cached_offset_dblks) { - if (_page_cb_arr[_pg_index]._state == AIO_PENDING) + if (_page_cb_arr[_pg_index]._state == AIO_PENDING) { +//std::cout << "#"; // DEBUG res = RHM_IORES_PAGE_AIOWAIT; - else - { + } else { if (_page_cb_arr[_pg_index]._state != IN_USE) { std::ostringstream oss; oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str(); - throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", - "write_flush"); + throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "write_flush"); } // Send current page using AIO - // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") - // if necessary. + // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") if necessary. dblk_roundup(); - std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES; + std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * QLS_DBLK_SIZE_BYTES; aio_cb* aiocbp = &_aio_cb_arr[_pg_index]; _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks); + _page_cb_arr[_pg_index]._state = AIO_PENDING; _aio_evt_rem++; +//std::cout << "." << _aio_evt_rem << std::flush; // DEBUG _cached_offset_dblks = 0; _jc->instr_incr_outstanding_aio_cnt(); @@ -610,7 +678,7 @@ wmgr::write_flush() _page_cb_arr[_pg_index]._state = IN_USE; } } - get_events(UNUSED, 0); + get_events(0, false); if (_page_cb_arr[_pg_index]._state == UNUSED) _page_cb_arr[_pg_index]._state = IN_USE; return res; @@ -620,17 +688,19 @@ void wmgr::get_next_file() { _pg_cntr = 0; +//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG _lfc.pullEmptyFileFromEfp(); } int32_t -wmgr::get_events(page_state state, timespec* const timeout, bool flush) +wmgr::get_events(timespec* const timeout, + bool flush) { if (_aio_evt_rem == 0) // no events to get return 0; int ret = 0; - if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0) + if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem, _aio_event_arr, timeout)) < 0) { if (ret == -EINTR) // Interrupted by signal return 0; @@ -652,6 +722,7 @@ wmgr::get_events(page_state state, times throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events"); } _aio_evt_rem--; +//std::cout << "'" << _aio_evt_rem; // DEBUG aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb) page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb) long aioret = (long)_aio_event_arr[i].res; @@ -671,6 +742,7 @@ wmgr::get_events(page_state state, times } if (pcbp) // Page writes have pcb { +//std::cout << "p"; // DEBUG uint32_t s = pcbp->_pdtokl->size(); std::vector<data_tok*> dtokl; dtokl.reserve(s); @@ -754,7 +826,8 @@ wmgr::get_events(page_state state, times // Clean up this pcb's data_tok list pcbp->_pdtokl->clear(); - pcbp->_state = state; + pcbp->_state = UNUSED; +//std::cout << "c" << pcbp->_index << pcbp->state_str(); // DEBUG // Perform AIO return callback if (_cbp && tot_data_toks) @@ -762,10 +835,10 @@ wmgr::get_events(page_state state, times } else // File header writes have no pcb { +//std::cout << "f"; // DEBUG file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; - _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS); + _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); _lfc.decrOutstandingAioOperationCount(fhp->_file_number); - //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this? } } @@ -784,7 +857,9 @@ wmgr::is_txn_synced(const std::string& x } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages) +wmgr::initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages) { pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); @@ -796,9 +871,11 @@ wmgr::initialize(aio_callback* const cbp } iores -wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, - const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/ - ) const +wmgr::pre_write_check(const _op_type op, + const data_tok* const dtokp, + const std::size_t /*xidsize*/, + const std::size_t /*dsize*/, + const bool /*external*/) const { // Check status of current file // TODO: Replace for LFC @@ -861,11 +938,12 @@ wmgr::pre_write_check(const _op_type op, } void -wmgr::dequeue_check(const std::string& xid, const uint64_t drid) +wmgr::dequeue_check(const std::string& xid, + const uint64_t drid) { // First check emap bool found = false; - int16_t fid; + uint64_t fid; short eres = _emap.get_pfid(drid, fid); if (eres < enq_map::EMAP_OK) { // fail if (eres == enq_map::EMAP_RID_NOT_FOUND) { @@ -891,13 +969,13 @@ void wmgr::dblk_roundup() { const uint32_t xmagic = QLS_EMPTY_MAGIC; - uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS; + uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); -#ifdef RHM_CLEAN - std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic)); +#ifdef QLS_CLEAN + std::memset((char*)wptr + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic)); #endif _pg_offset_dblks++; _cached_offset_dblks++; @@ -907,14 +985,15 @@ wmgr::dblk_roundup() void wmgr::rotate_page() { - _page_cb_arr[_pg_index]._state = AIO_PENDING; - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) +//std::cout << "^^^^^ wmgr::rotate_page() " << status_str() << " pi=" << _pg_index; // DEBUG + if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; } if (++_pg_index >= _cache_num_pages) _pg_index = 0; +//std::cout << "->" << _pg_index << std::endl; // DEBUG } void @@ -928,7 +1007,7 @@ wmgr::status_str() const std::ostringstream oss; oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr; oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem; - oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F"); + oss << " edac=" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F"); oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F"); oss << " ps=["; for (int i=0; i<_cache_num_pages; i++) @@ -938,11 +1017,10 @@ wmgr::status_str() const case UNUSED: oss << "-"; break; case IN_USE: oss << "U"; break; case AIO_PENDING: oss << "A"; break; - case AIO_COMPLETE: oss << "*"; break; default: oss << _page_cb_arr[i]._state; } } - oss << "] " << _lfc.status(0); + oss << "] "; return oss.str(); } Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1534383&r1=1534382&r2=1534383&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Mon Oct 21 21:26:10 2013 @@ -84,22 +84,45 @@ namespace qls_jrnl std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts public: - wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc); - wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us); + wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc); + wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us); virtual ~wmgr(); - void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages, const uint32_t max_dtokpp, - const uint32_t max_iowait_us, std::size_t eo = 0); - iores enqueue(const void* const data_buff, const std::size_t tot_data_len, - const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr, - const std::size_t xid_len, const bool transient, const bool external); - iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, - const bool txn_coml_commit); - iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len); - iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len); + void initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us, + std::size_t eo = 0); + iores enqueue(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool transient, + const bool external); + iores dequeue(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool txn_coml_commit); + iores abort(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len); + iores commit(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len); iores flush(); - int32_t get_events(page_state state, timespec* const timeout, bool flush = false); + int32_t get_events(timespec* const timeout, + bool flush); bool is_txn_synced(const std::string& xid); inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; } inline uint32_t unflushed_dblks() { return _cached_offset_dblks; } @@ -108,14 +131,22 @@ namespace qls_jrnl const std::string status_str() const; private: - void initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages); - iores pre_write_check(const _op_type op, const data_tok* const dtokp, - const std::size_t xidsize = 0, const std::size_t dsize = 0, const bool external = false) - const; - void dequeue_check(const std::string& xid, const uint64_t drid); - void file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem); - void flush_check(iores& res, bool& cont, bool& done); + void initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages); + iores pre_write_check(const _op_type op, + const data_tok* const dtokp, + const std::size_t xidsize = 0, + const std::size_t dsize = 0, + const bool external = false) const; + void dequeue_check(const std::string& xid, + const uint64_t drid); + void file_header_check(const uint64_t rid, + const bool cont, + const uint32_t rec_dblks_rem); + void flush_check(iores& res, + bool& cont, + bool& done, const uint64_t rid); iores write_flush(); void get_next_file(); void dblk_roundup(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
