Author: kpvdr
Date: Tue Nov  4 14:58:47 2014
New Revision: 1636598

URL: http://svn.apache.org/r1636598
Log:
QPID-5671: [linearstore] Add ability to use disk partitions and select 
per-queue EFPs. This is the first part of resolving this issue and changes the 
journal directory structure to use symlinks instead of moving the actual files. 
This opens the way to having files from multiple partitions in the same 
journal. Other small improvements and tidy-ups also included.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Tue Nov  4 14:58:47 2014
@@ -25,31 +25,24 @@ Current/pending:
  ------ -------  ----------------------
    5359 -        Linearstore: Implement new management schema and wire into 
store
    5360 -        Linearstore: Evaluate and rework logging to produce a 
consistent log output
-   5361 -        Linearstore: No tests for linearstore functionality currently 
exist
+   5361 1145359  Linearstore: No tests for linearstore functionality currently 
exist
                    svn r.1564893 2014-02-05: Added tx-test-soak.sh
                    svn r.1564935 2014-02-05: Added license text to 
tx-test-soak.sh
+                   svn r.1625283 2014-09-16: Basic python tests from 
legacystore ported over to linearstore
                    * No existing tests for linearstore:
                    ** Basic broker-level tests for txn and non-txn recovery
                    ** Store-level tests which check write boundary conditions
                    ** EFP tests, including file recovery, error management
                    ** Unit tests
                    ** Basic performance tests
-   5362 -        Linearstore: No store tools exist for examining the journals
-                   svn r.1556888 2014-01-09: WIP checkin for linearstore 
version of qpid_qls_analyze. Needs testing and tidy-up.
-                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
-                   svn r.1561848 2014-01-27: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1564808 2014-02-05: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1578899 2014-03-18: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
-                   * Store analysis and status
-                   * Recovery/reading of message content
-                   * Empty file pool status and management
    5464 -        [linearstore] Incompletely created journal files accumulate 
in EFP
-   -    1088944  [Linearstore] store does not return all files to EFP after 
purging big queue
-   -    1078937  [linearstore] Installation and tests for new store analysis 
tool qpid-qls-analyze
-                   svn r.1596633 2014-05-21: Modified to run from installed 
location
-*  6043 1066256  [LinearStore] changing efp size after using store broke the 
new durable nodes creation
-*  -    1089652  [RFE]: Configuration option for linear store to delete the 
used journal files instead of recycling them.
+   -    1088944  [Linearstore] store does not return all files to EFP after 
purging big queue <queue purge issue>
+   6043 1066256  [LinearStore] changing efp size after using store broke the 
new durable nodes creation
+   -    1067480  [LinearStore] Provide a way to limit max count/size of empty 
files in EFP
+   -    1067429  [LinearStore] last file from deleted queue is not moved to 
EFP  <queue delete issue>
+   -    1067482  [LinearStore] Provide a way to prealocate empty pages in EFP
+   5671          [linearstore] Add ability to use disk partitions and select 
per-queue EFPs
+
 
 
 Fixed/closed:
@@ -118,12 +111,24 @@ NO-JIRA -        Added missing Apache co
    5651       -  [C++ broker] segfault in 
qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
                    svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
                    * Bug introduced by r.1578899.
+   5362 1145363  Linearstore: No store tools exist for examining the journals
+                   svn r.1556888 2014-01-09: WIP checkin for linearstore 
version of qpid_qls_analyze. Needs testing and tidy-up.
+                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+                   svn r.1561848 2014-01-27: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1564808 2014-02-05: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1578899 2014-03-18: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+                   * Store analysis and status
+                   * Recovery/reading of message content
+                   * Empty file pool status and management
    5661       -  [linearstore] Set default cmake build to exclude linearstore
                    svn r.1584379 2014-04-03 Proposed solution.
                    * Run ccmake, select BUILD_LINEARSTORE to change its value 
to ON to build.
    5750 1078142  [linearstore] qpidd closes connection with (distributed) 
transactional client while checking previous transaction, broker signals error 
(closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103 
wmgr::get_events() threw JERR__AIO: AIO error)
                    svn r.1594215 2014-05-13 Proposed solution.
                    * jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO 
error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192 
offset=4096 fh=22])
+   5655 1078937  [linearstore] Installation and tests for new store analysis 
tool qpid-qls-analyze
+                   svn r.1596633 2014-05-21: Modified to run from installed 
location
    5767 1098118  [linearstore] broker segfaults when recovering journal file 
with damaged header
                    svn r.1596509 2014-05-21 Proposed solution (committed by 
pmoravec)
                    svn r.1599243 2014-06-02 Solution to additional case of 
file header corruption
@@ -134,6 +139,10 @@ NO-JIRA -        Added missing Apache co
                    This turned out to be an AMQP error, fix does not affect 
store code.
    6043 1089652  [RFE]: Configuration option for linear store to delete or 
overwrite the used journal files.
                    svn r.1620426 2014-08-25 Proposed solution
+   6147 1152012  [C++ broker linearstore] missing journal id in "trace Mgmt 
create journal." log
+                   svn r.1631360 2014-10-13 Proposed solution
+   6157 1150397  linearstore: segfault when 2 journals request new journal 
file from empty EFP
+                   svn r.1632504 2014-10-17 Proposed solution by pmoravec
                    
 
 Ordered checkin list:
@@ -175,6 +184,8 @@ no.   svn r  Q-JIRA     RHBZ       Date 
 30. 1599243    5767  1098118 2014-06-02
 31. 1614665    5924  1124906 2014-07-30
 32. 1620426    6043  1089652 2014-08-25
+33. 1631360    6147  1152012 2014-10-13 (pmoravec)
+34. 1632504    6157  1150397 2014-10-17 (pmoravec)
 
 See above sections for details on these checkins.
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Tue Nov  4 
14:58:47 2014
@@ -201,8 +201,7 @@ bool MessageStoreImpl::init(const std::s
 
     if (truncateFlag_)
         truncateInit();
-    else
-        init();
+    init(truncateFlag_);
 
     QLS_LOG(info, "Store module initialized; store-dir=" << storeDir_);
     QLS_LOG(info,   "> Default EFP partition: " << defaultEfpPartitionNumber);
@@ -218,7 +217,7 @@ bool MessageStoreImpl::init(const std::s
     return isInit;
 }
 
-void MessageStoreImpl::init()
+void MessageStoreImpl::init(const bool truncateFlag)
 {
     const int retryMax = 3;
     int bdbRetryCnt = 0;
@@ -296,6 +295,7 @@ void MessageStoreImpl::init()
                                                           
defaultEfpPartitionNumber,
                                                           
defaultEfpFileSize_kib,
                                                           
overwriteBeforeReturnFlag,
+                                                          truncateFlag,
                                                           jrnlLog));
     efpMgr->findEfpPartitions();
 }
@@ -337,13 +337,12 @@ void MessageStoreImpl::truncateInit()
         isInit = false;
     }
 
-    // TODO: Linearstore: harvest all discareded journal files into the empy 
file pool(s).
-
     qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+
+    // TODO: Linearstore: harvest all discarded journal files into the empty 
file pool(s).
     qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
     qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
     QLS_LOG(info, "Store directory " << getStoreTopLevelDir() << " was 
truncated.");
-    init();
 }
 
 void MessageStoreImpl::chkTplStoreInit()

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Tue Nov  4 
14:58:47 2014
@@ -157,7 +157,7 @@ class MessageStoreImpl : public qpid::br
     static qpid::linearstore::journal::efpDataSize_kib_t 
chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t 
efpFileSizeKiB,
                                                               const 
std::string& paramName);
 
-    void init();
+    void init(const bool truncateFlag);
 
     void recoverQueues(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery,

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp Tue Nov  
4 14:58:47 2014
@@ -33,37 +33,53 @@
 #include <unistd.h>
 #include <vector>
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
 
+// static
+std::string EmptyFilePool::s_inuseFileDirectory_ = "in_use";
+
+// static
+std::string EmptyFilePool::s_returnedFileDirectory_ = "returned";
+
 EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
                              const EmptyFilePoolPartition* partitionPtr,
+                             const bool overwriteBeforeReturnFlag,
+                             const bool truncateFlag,
                              JournalLog& journalLogRef) :
                 efpDirectory_(efpDirectory),
                 efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, 
partitionPtr->getPartitionNumber())),
                 partitionPtr_(partitionPtr),
+                overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {}
 
 EmptyFilePool::~EmptyFilePool() {}
 
 void EmptyFilePool::initialize() {
+//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition 
" << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ 
<< std::endl; // DEBUG
     std::vector<std::string> dirList;
+
+    // Process empty files in main dir
     jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
         size_t dotPos = i->rfind(".");
         if (dotPos != std::string::npos) {
             if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
-                std::string emptyFile(efpDirectory_ + "/" + (*i));
-                if (validateEmptyFile(emptyFile)) {
-                    pushEmptyFile(emptyFile);
+                std::string emptyFileName(efpDirectory_ + "/" + (*i));
+                if (validateEmptyFile(emptyFileName)) {
+                    pushEmptyFile(emptyFileName);
                 }
             }
         }
     }
+
+    // Create 'in_use' and 'returned' subdirs if they don't already exist
+    // Retern files to EFP in 'in_use' and 'returned' subdirs if they do exist
+    initializeSubDirectory(efpDirectory_ + "/" + s_inuseFileDirectory_);
+    initializeSubDirectory(efpDirectory_ + "/" + s_returnedFileDirectory_);
 }
 
 efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
@@ -106,36 +122,29 @@ const efpIdentity_t EmptyFilePool::getId
 
 std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
     std::string emptyFileName = popEmptyFile();
-    std::string newFileName = destDirectory + 
emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes 
leading '/'
-    if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+    std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + 
emptyFileName.substr(emptyFileName.rfind('/'));
+    std::string symlinkName = destDirectory + 
emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes 
leading '/'
+    if (moveFile(emptyFileName, newFileName)) {
         // Try again with new UUID for file name
-        newFileName = destDirectory + "/" + getEfpFileName();
-        if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+        newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + 
getEfpFileName();
+        if (moveFile(emptyFileName, newFileName)) {
+//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from  
EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
             pushEmptyFile(emptyFileName);
             std::ostringstream oss;
             oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName 
<< "\"" << FORMAT_SYSERR(errno);
             throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), 
"EmptyFilePool", "takeEmptyFile");
         }
     }
-    return newFileName;
+    if (createSymLink(newFileName, symlinkName)) {
+        std::ostringstream oss;
+        oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << 
"\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"takeEmptyFile");
+    }
+    return symlinkName;
 }
 
-void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
-    std::string emptyFileName(efpDirectory_ + 
fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
-    if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
-        // Try again with new UUID for file name
-        emptyFileName = efpDirectory_ + "/" + getEfpFileName();
-        if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
-            // Failed twice in a row - delete file
-            ::unlink(fqSrcFile.c_str());
-            return;
-        }
-    }
-    resetEmptyFileHeader(emptyFileName);
-    if (partitionPtr_->getOverwriteBeforeReturnFlag()) {
-        overwriteFileContents(emptyFileName);
-    }
-    pushEmptyFile(emptyFileName);
+void EmptyFilePool::returnEmptyFileSymlink(const std::string& 
emptyFileSymlink) {
+    returnEmptyFile(deleteSymlink(emptyFileSymlink));
 }
 
 //static
@@ -173,12 +182,12 @@ efpDataSize_kib_t EmptyFilePool::dataSiz
 
 // --- protected functions ---
 
-// WARNING: this method needs to be called under the scope of 
emptyFileListMutex_ lock
-void EmptyFilePool::createEmptyFile() {
+std::string EmptyFilePool::createEmptyFile() {
     std::string efpfn = getEfpFileName();
-    if (overwriteFileContents(efpfn)) {
-        emptyFileList_.push_back(efpfn);
+    if (!overwriteFileContents(efpfn)) {
+        // TODO: handle failure to prepare new file here
     }
+    return efpfn;
 }
 
 std::string EmptyFilePool::getEfpFileName() {
@@ -188,6 +197,27 @@ std::string EmptyFilePool::getEfpFileNam
     return oss.str();
 }
 
+void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
+    std::vector<std::string> dirList;
+    if (jdir::exists(fqDirName)) {
+        if (truncateFlag_) {
+            jdir::read_dir(fqDirName, dirList, false, true, false, false);
+            for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
+                size_t dotPos = i->rfind(".");
+                if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 
41) {
+                    returnEmptyFile(fqDirName + "/" + (*i));
+                } else {
+                    std::ostringstream oss;
+                    oss << "File \'" << *i << "\' was not a journal file and 
was not returned to EFP.";
+                    journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+                }
+            }
+        }
+    } else {
+        jdir::create_dir(fqDirName);
+    }
+}
+
 bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
     ::file_hdr_t fh;
     ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 
QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), 
efpDataSize_kib_);
@@ -199,22 +229,27 @@ bool EmptyFilePool::overwriteFileContent
             ofs.put('\0');
         ofs.close();
         return true;
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new 
journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << 
std::endl; // DEBUG
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created 
new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the 
fly" << std::endl; // DEBUG
     } else {
-//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; 
// DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\"" << 
std::endl; // DEBUG
     }
     return false;
 }
 
 std::string EmptyFilePool::popEmptyFile() {
     std::string emptyFileName;
+    bool listEmptyFlag;
     {
         slock l(emptyFileListMutex_);
-        if (emptyFileList_.empty()) {
-            createEmptyFile();
+        listEmptyFlag = emptyFileList_.empty();
+        if (!listEmptyFlag) {
+            emptyFileName = emptyFileList_.front();
+            emptyFileList_.pop_front();
         }
-        emptyFileName = emptyFileList_.front();
-        emptyFileList_.pop_front();
+    }
+    // If the list is empty, create a new file and return the file name.
+    if (listEmptyFlag) {
+        emptyFileName = createEmptyFile();
     }
     return emptyFileName;
 }
@@ -224,6 +259,28 @@ void EmptyFilePool::pushEmptyFile(const 
     emptyFileList_.push_back(fqFileName);
 }
 
+void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
+    std::string returnedFileName = efpDirectory_ + "/" + 
s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // 
NOTE: substr() includes leading '/'
+    if (moveFile(emptyFileName, returnedFileName)) {
+        ::unlink(emptyFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " 
<< returnedFileName << "; deleted." << std::endl; // DEBUG
+    }
+
+    // TODO: On a separate thread, process returned files by overwriting 
headers and, optionally, their contents and
+    // returning them to the EFP directory
+    resetEmptyFileHeader(returnedFileName);
+    if (overwriteBeforeReturnFlag_) {
+        overwriteFileContents(returnedFileName);
+    }
+    std::string sanitizedEmptyFileName = efpDirectory_ + 
returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() 
includes leading '/'
+    if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+        ::unlink(returnedFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to 
" << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
+    } else {
+        pushEmptyFile(sanitizedEmptyFileName);
+    }
+}
+
 void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
     std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | 
std::fstream::binary);
     if (fs.good()) {
@@ -239,14 +296,14 @@ void EmptyFilePool::resetEmptyFileHeader
             fs.write(buff, buffsize);
             std::streampos bytesWritten = fs.tellp();
             if (std::streamoff(bytesWritten) != buffsize) {
-//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ 
<< "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " 
bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to write file header of file \"" << 
fqFileName << "\": tried to write " << buffsize << " bytes; wrote " << 
bytesWritten << " bytes." << std::endl; // DEBUG
             }
         } else {
-//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ 
<< "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead 
<< " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to read file header of file \"" << 
fqFileName << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " 
<< bytesRead << " bytes." << std::endl; // DEBUG
         }
         fs.close();
     } else {
-//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for 
reading" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\" for 
reading" << std::endl; // DEBUG
     }
 }
 
@@ -329,7 +386,7 @@ bool EmptyFilePool::validateEmptyFile(co
 }
 
 // static
-int EmptyFilePool::moveEmptyFile(const std::string& from,
+int EmptyFilePool::moveFile(const std::string& from,
                                  const std::string& to) {
     if (::rename(from.c_str(), to.c_str())) {
         if (errno == EEXIST) return errno; // File name exists
@@ -340,4 +397,29 @@ int EmptyFilePool::moveEmptyFile(const s
     return 0;
 }
 
+//static
+int EmptyFilePool::createSymLink(const std::string& fqFileName,
+                                 const std::string& fqLinkName) {
+    if(::symlink(fqFileName.c_str(), fqLinkName.c_str())) {
+        if (errno == EEXIST) return errno; // File name exists
+        std::ostringstream oss;
+        oss << "file=\"" << fqFileName << "\" symlink=\"" <<  fqLinkName << 
"\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"createSymLink");
+    }
+    return 0;
+}
+
+//static
+std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
+    char buff[1024];
+    ssize_t len = ::readlink(fqLinkName.c_str(), buff, 1024);
+    if (len < 0) {
+        std::ostringstream oss;
+        oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"deleteSymlink");
+    }
+    ::unlink(fqLinkName.c_str());
+    return std::string(buff, len);
+}
+
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h Tue Nov  4 
14:58:47 2014
@@ -44,11 +44,16 @@ class EmptyFilePool
 {
 protected:
     typedef std::deque<std::string> emptyFileList_t;
-    typedef emptyFileList_t::iterator emptyFileListItr_t;
+    typedef emptyFileList_t::const_iterator emptyFileListConstItr_t;
+
+    static std::string s_inuseFileDirectory_;
+    static std::string s_returnedFileDirectory_;
 
     const std::string efpDirectory_;
     const efpDataSize_kib_t efpDataSize_kib_;
     const EmptyFilePoolPartition* partitionPtr_;
+    const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
 
 private:
@@ -58,6 +63,8 @@ private:
 public:
     EmptyFilePool(const std::string& efpDirectory,
                   const EmptyFilePoolPartition* partitionPtr,
+                  const bool overwriteBeforeReturnFlag,
+                  const bool truncateFlag,
                   JournalLog& journalLogRef);
     virtual ~EmptyFilePool();
 
@@ -73,23 +80,28 @@ public:
     const efpIdentity_t getIdentity() const;
 
     std::string takeEmptyFile(const std::string& destDirectory);
-    void returnEmptyFile(const std::string& srcFile);
+    void returnEmptyFileSymlink(const std::string& emptyFileSymlink);
 
     static std::string dirNameFromDataSize(const efpDataSize_kib_t 
efpDataSize_kib);
     static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& 
dirName,
                                                      const 
efpPartitionNumber_t partitionNumber);
 
 protected:
-    void createEmptyFile();
+    std::string createEmptyFile();
     std::string getEfpFileName();
-    std::string popEmptyFile();
+    void initializeSubDirectory(const std::string& fqDirName);
     bool overwriteFileContents(const std::string& fqFileName);
+    std::string popEmptyFile();
     void pushEmptyFile(const std::string fqFileName);
+    void returnEmptyFile(const std::string& emptyFileName);
     void resetEmptyFileHeader(const std::string& fqFileName);
     bool validateEmptyFile(const std::string& emptyFileName) const;
 
-    static int moveEmptyFile(const std::string& fromFqPath,
-                             const std::string& toFqPath);
+    static int moveFile(const std::string& fromFqPath,
+                        const std::string& toFqPath);
+    static int createSymLink(const std::string& fqFileName,
+                             const std::string& fqLinkName);
+    static std::string deleteSymlink(const std::string& fqLinkName);
 };
 
 }}}

Modified: 
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp 
Tue Nov  4 14:58:47 2014
@@ -27,8 +27,6 @@
 #include "qpid/linearstore/journal/JournalLog.h"
 #include "qpid/linearstore/journal/slock.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
@@ -37,11 +35,13 @@ EmptyFilePoolManager::EmptyFilePoolManag
                                            const efpPartitionNumber_t 
defaultPartitionNumber,
                                            const efpDataSize_kib_t 
defaultEfpDataSize_kib,
                                            const bool 
overwriteBeforeReturnFlag,
+                                           const bool truncateFlag,
                                            JournalLog& journalLogRef) :
                 qlsStorePath_(qlsStorePath),
                 defaultPartitionNumber_(defaultPartitionNumber),
                 defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
                 overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {}
 
@@ -63,20 +63,7 @@ void EmptyFilePoolManager::findEfpPartit
             efpPartitionNumber_t pn = 
EmptyFilePoolPartition::getPartitionNumber(*i);
             if (pn > 0) { // valid partition name found
                 std::string fullDirPath(qlsStorePath_ + "/" + (*i));
-                EmptyFilePoolPartition* efppp = 0;
-                try {
-                    efppp = new EmptyFilePoolPartition(pn, fullDirPath, 
overwriteBeforeReturnFlag_, journalLogRef_);
-                    {
-                        slock l(partitionMapMutex_);
-                        partitionMap_[pn] = efppp;
-                    }
-                } catch (const std::exception& e) {
-                    if (efppp != 0) {
-                        delete efppp;
-                        efppp = 0;
-                    }
-//std::cerr << "Unable to initialize partition " << pn << " (\'" << 
fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
-                }
+                EmptyFilePoolPartition* efppp = insertPartition(pn, 
fullDirPath);
                 if (efppp != 0)
                     efppp->findEmptyFilePools();
                 foundPartition = true;
@@ -85,37 +72,28 @@ void EmptyFilePoolManager::findEfpPartit
 
         // If no partition was found, create an empty default partition.
         if (!foundPartition) {
-            journalLogRef_.log(JournalLog::LOG_INFO, "No EFP partition found, 
creating an empty partition.");
-            std::ostringstream oss;
-            oss << qlsStorePath_ << "/" << 
EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
-                << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << 
EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
-            jdir::create_dir(oss.str());
+            std::ostringstream oss1;
+            oss1 << qlsStorePath_ << "/" << 
EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+                << "/" << 
EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+            jdir::create_dir(oss1.str());
+            insertPartition(defaultPartitionNumber_, oss1.str());
+            std::ostringstream oss2;
+            oss2 << "No EFP partition found, creating an empty partition at " 
<< oss1.str();
+            journalLogRef_.log(JournalLog::LOG_INFO, oss2.str());
         }
     }
 
     journalLogRef_.log(JournalLog::LOG_INFO, "EFP Manager initialization 
complete");
     std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> 
partitionList;
-    std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList;
     getEfpPartitions(partitionList);
     if (partitionList.size() == 0) {
         journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No 
queue creation is possible.");
     } else {
         std::stringstream oss;
-        oss << "> EFP Partitions found: " << partitionList.size();
+        oss << "EFP Partitions found: " << partitionList.size();
         journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
         for 
(std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator
 i=partitionList.begin(); i!= partitionList.end(); ++i) {
-            filePoolList.clear();
-            (*i)->getEmptyFilePools(filePoolList);
-            std::stringstream oss;
-            oss << "  * Partition " << (*i)->getPartitionNumber() << " 
containing " << filePoolList.size()
-                << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" 
<< (*i)->getPartitionDirectory() << "\'";
-            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
-            for 
(std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator 
j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
-                std::ostringstream oss;
-                oss << "    - EFP \'" << (*j)->dataSize_kib() << "k\' 
containing " << (*j)->numEmptyFiles() <<
-                              " files of size " << (*j)->dataSize_kib() << " 
KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
-            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
-            }
+            journalLogRef_.log(JournalLog::LOG_INFO, (*i)->toString(5U));
         }
     }
 }
@@ -210,4 +188,22 @@ uint16_t EmptyFilePoolManager::getNumEfp
     return partitionMap_.size();
 }
 
+EmptyFilePoolPartition* EmptyFilePoolManager::insertPartition(const 
efpPartitionNumber_t pn, const std::string& fullPartitionPath) {
+    EmptyFilePoolPartition* efppp = 0;
+    try {
+        efppp = new EmptyFilePoolPartition(pn, fullPartitionPath, 
overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+        {
+            slock l(partitionMapMutex_);
+            partitionMap_[pn] = efppp;
+        }
+    } catch (const std::exception& e) {
+        if (efppp != 0) {
+            delete efppp;
+            efppp = 0;
+        }
+//std::cerr << "*** Unable to initialize partition " << pn << " (\'" << 
fullPartitionPath << "\'): " << e.what() << std::endl; // DEBUG
+    }
+    return efppp;
+}
+
 }}}

Modified: 
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h Tue 
Nov  4 14:58:47 2014
@@ -46,6 +46,7 @@ protected:
     const efpPartitionNumber_t defaultPartitionNumber_;
     const efpDataSize_kib_t defaultEfpDataSize_kib_;
     const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
     partitionMap_t partitionMap_;
     smutex partitionMapMutex_;
@@ -55,6 +56,7 @@ public:
                          const efpPartitionNumber_t defaultPartitionNumber,
                          const efpDataSize_kib_t defaultEfpDataSize_kib,
                          const bool overwriteBeforeReturnFlag,
+                         const bool truncateFlag,
                          JournalLog& journalLogRef_);
     virtual ~EmptyFilePoolManager();
 
@@ -72,6 +74,8 @@ public:
     void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
                            const efpPartitionNumber_t efpPartitionNumber = 0);
     uint16_t getNumEfpPartitions() const;
+protected:
+    EmptyFilePoolPartition* insertPartition(const efpPartitionNumber_t pn, 
const std::string& fullPartitionPath);
 };
 
 }}}

Modified: 
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp 
Tue Nov  4 14:58:47 2014
@@ -26,22 +26,19 @@
 #include "qpid/linearstore/journal/jdir.h"
 #include "qpid/linearstore/journal/slock.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
 
-// static
-const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets 
the top-level efp dir within a partition
-
 EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t 
partitionNum,
                                                const std::string& partitionDir,
                                                const bool 
overwriteBeforeReturnFlag,
+                                               const bool truncateFlag,
                                                JournalLog& journalLogRef) :
                 partitionNum_(partitionNum),
                 partitionDir_(partitionDir),
                 overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {
     validatePartitionDir();
@@ -57,25 +54,13 @@ EmptyFilePoolPartition::~EmptyFilePoolPa
 
 void
 EmptyFilePoolPartition::findEmptyFilePools() {
-//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "*** Reading " << partitionDir_ << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(partitionDir_, dirList, true, false, false, false);
-    bool foundEfpDir = false;
-    for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
-        if (i->compare(s_efpTopLevelDir_) == 0) {
-            foundEfpDir = true;
-            break;
-        }
-    }
-    if (foundEfpDir) {
-        std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
-//std::cout << "Reading " << efpDir << std::endl; // DEBUG
-        dirList.clear();
-        jdir::read_dir(efpDir, dirList, true, false, false, true);
+        jdir::read_dir(partitionDir_, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
             EmptyFilePool* efpp = 0;
             try {
-                efpp = new EmptyFilePool(*i, this, journalLogRef_);
+                efpp = new EmptyFilePool(*i, this, overwriteBeforeReturnFlag_, 
truncateFlag_, journalLogRef_);
                 {
                     slock l(efpMapMutex_);
                     efpMap_[efpp->dataSize_kib()] = efpp;
@@ -88,13 +73,14 @@ EmptyFilePoolPartition::findEmptyFilePoo
                 }
                 //std::cerr << "WARNING: " << e.what() << std::endl;
             }
-            if (efpp != 0)
+            if (efpp != 0) {
                 efpp->initialize();
+            }
         }
-    }
 }
 
 EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const 
efpDataSize_kib_t efpDataSize_kib) {
+    slock l(efpMapMutex_);
     efpMapItr_t i = efpMap_.find(efpDataSize_kib);
     if (i == efpMap_.end())
         return 0;
@@ -102,21 +88,19 @@ EmptyFilePool* EmptyFilePoolPartition::g
 }
 
 void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& 
efpList) {
+    slock l(efpMapMutex_);
     for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
         efpList.push_back(i->second);
     }
 }
 
 void 
EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>&
 efpDataSizesList_kib) const {
+    slock l(efpMapMutex_);
     for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
         efpDataSizesList_kib.push_back(i->first);
     }
 }
 
-bool EmptyFilePoolPartition::getOverwriteBeforeReturnFlag() const {
-    return overwriteBeforeReturnFlag_;
-}
-
 std::string EmptyFilePoolPartition::getPartitionDirectory() const {
     return partitionDir_;
 }
@@ -125,6 +109,32 @@ efpPartitionNumber_t EmptyFilePoolPartit
     return partitionNum_;
 }
 
+std::string EmptyFilePoolPartition::toString(const uint16_t indent) const {
+    std::string indentStr(indent, ' ');
+    std::stringstream oss;
+    oss << "EFP Partition " <<  partitionNum_ << ":" << std::endl;
+    oss << indentStr  << "EFP Partition Analysis (partition " << partitionNum_ 
<< " at \"" << partitionDir_ << "\"):" << std::endl;
+    if (efpMap_.empty()) {
+        oss << indentStr << "<Partition empty, no EFPs found>" << std::endl;
+    } else {
+        oss << indentStr << std::setw(12) << "efp_size_kib"
+                         << std::setw(12) << "num_files"
+                         << std::setw(18) << "tot_capacity_kib" << std::endl;
+        oss << indentStr << std::setw(12) << "------------"
+                         << std::setw(12) << "----------"
+                         << std::setw(18) << "----------------" << std::endl;
+        {
+            slock l(efpMapMutex_);
+            for (efpMapConstItr_t i=efpMap_.begin(); i!= efpMap_.end(); ++i) {
+                oss << indentStr << std::setw(12) << i->first
+                                 << std::setw(12) << i->second->numEmptyFiles()
+                                 << std::setw(18) << 
i->second->cumFileSize_kib() << std::endl;
+            }
+        }
+    }
+    return oss.str();
+}
+
 // static
 std::string EmptyFilePoolPartition::getPartionDirectoryName(const 
efpPartitionNumber_t partitionNumber) {
     std::ostringstream oss;

Modified: 
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h 
Tue Nov  4 14:58:47 2014
@@ -37,8 +37,6 @@ class JournalLog;
 
 class EmptyFilePoolPartition
 {
-public:
-    static const std::string s_efpTopLevelDir_;
 protected:
     typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
     typedef efpMap_t::iterator efpMapItr_t;
@@ -47,6 +45,7 @@ protected:
     const efpPartitionNumber_t partitionNum_;
     const std::string partitionDir_;
     const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
     efpMap_t efpMap_;
     smutex efpMapMutex_;
@@ -55,6 +54,7 @@ public:
     EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
                            const std::string& partitionDir,
                            const bool overwriteBeforeReturnFlag,
+                           const bool truncateFlag,
                            JournalLog& journalLogRef);
     virtual ~EmptyFilePoolPartition();
 
@@ -62,9 +62,9 @@ public:
     EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
     void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
     void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& 
efpDataSizesList) const;
-    bool getOverwriteBeforeReturnFlag() const;
     std::string getPartitionDirectory() const;
     efpPartitionNumber_t getPartitionNumber() const;
+    std::string toString(const uint16_t indent) const;
 
     static std::string getPartionDirectoryName(const efpPartitionNumber_t 
partitionNumber);
     static efpPartitionNumber_t getPartitionNumber(const std::string& name);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h Tue 
Nov  4 14:58:47 2014
@@ -23,6 +23,7 @@
 #define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
 
 #include <iostream>
+#include <sstream>
 #include <stdint.h>
 
 namespace qpid {
@@ -42,7 +43,13 @@ typedef struct efpIdentity_t {
     efpIdentity_t() : pn_(0), ds_(0) {}
     efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), 
ds_(ds) {}
     efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {}
-    friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os 
<< "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+    friend std::ostream& operator<<(std::ostream& os, const efpIdentity_t& id) 
{
+        // This two-stage write allows this << operator to be used with 
std::setw() for formatted writes
+        std::ostringstream oss;
+        oss << id.pn_ << "," << id.ds_;
+        os << oss.str();
+        return os;
+    }
 } efpIdentity_t;
 
 }}}

Modified: 
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp 
Tue Nov  4 14:58:47 2014
@@ -96,7 +96,7 @@ uint64_t LinearFileController::getNextRe
 
 void LinearFileController::removeFileToEfp(const std::string& fileName) {
     if (emptyFilePoolPtr_) {
-        emptyFilePoolPtr_->returnEmptyFile(fileName);
+        emptyFilePoolPtr_->returnEmptyFileSymlink(fileName);
     }
 }
 
@@ -108,7 +108,7 @@ void LinearFileController::restoreEmptyF
 void LinearFileController::purgeEmptyFilesToEfp() {
     slock l(journalFileListMutex_);
     while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && 
journalFileList_.size() > 1) { // Can't purge last file, even if it has no 
enqueued records
-        
emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        
emptyFilePoolPtr_->returnEmptyFileSymlink(journalFileList_.front()->getFqFileName());
         delete journalFileList_.front();
         journalFileList_.pop_front();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Tue 
Nov  4 14:58:47 2014
@@ -322,36 +322,7 @@ void RecoveryManager::setLinearFileContr
     }
 }
 
-std::string RecoveryManager::toString(const std::string& jid) {
-    std::ostringstream oss;
-    oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
-    oss << "  Number of journal files = " << fileNumberMap_.size() << 
std::endl;
-    oss << "  Journal File List:" << std::endl;
-    for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); 
k!=fileNumberMap_.end(); ++k) {
-        std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
-        oss << "    " << k->first << ": " << 
fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
-    }
-    oss << "  Enqueue Counts: [ ";
-    for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); 
l!=fileNumberMap_.end(); ++l) {
-        if (l != fileNumberMap_.begin()) {
-            oss << ", ";
-        }
-        oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
-    }
-    oss << " ]" << std::endl;
-    oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << 
std::endl;
-    oss << "  First record offset in first file = 0x" << std::hex << 
firstRecordOffset_ <<
-            std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " 
dblks)" << std::endl;
-    oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  
<<
-            (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-    oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << 
std::endl;
-    oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << 
std::dec << std::endl;
-    oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << 
std::endl;
-    oss << "  Enqueued records (txn & non-txn):" << std::endl;
-    return oss.str();
-}
-
-std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+std::string RecoveryManager::toString(const std::string& jid, const uint16_t 
indent) const {
     std::string indentStr(indent, ' ');
     std::ostringstream oss;
     oss << std::endl << indentStr  << "Journal recovery analysis (jid=\"" << 
jid << "\"):" << std::endl;
@@ -360,18 +331,17 @@ std::string RecoveryManager::toLog(const
     } else {
         oss << indentStr << std::setw(7) << "file_id"
                          << std::setw(43) << "file_name"
-                         << std::setw(16) << "fro"
                          << std::setw(12) << "record_cnt"
-                         << std::setw(5) << "ptn"
-                         << std::setw(10) << "efp"
+                         << std::setw(16) << "fro"
+                         << std::setw(12) << "efp_id"
                          << std::endl;
         oss << indentStr << std::setw(7) << "-------"
                          << std::setw(43) << 
"-----------------------------------------"
+                         << std::setw(12) << "----------"
                          << std::setw(16) << "--------------"
                          << std::setw(12) << "----------"
-                         << std::setw(5) << "---"
-                         << std::setw(10) << "--------"
                          << std::endl;
+        uint32_t totalRecordCount(0UL);
         for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); 
k!=fileNumberMap_.end(); ++k) {
             std::string fqFileName = 
k->second->journalFilePtr_->getFqFileName();
             std::ostringstream fid;
@@ -380,19 +350,20 @@ std::string RecoveryManager::toLog(const
             fro << std::hex << "0x" << 
k->second->journalFilePtr_->getFirstRecordOffset();
             oss << indentStr << std::setw(7) << fid.str()
                              << std::setw(43) << 
fqFileName.substr(fqFileName.rfind('/')+1)
-                             << std::setw(16) << fro.str()
                              << std::setw(12) << 
k->second->journalFilePtr_->getEnqueuedRecordCount()
-                             << std::setw(5) << 
k->second->journalFilePtr_->getEfpIdentity().pn_
-                             << std::setw(9) << 
k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
+                             << std::setw(16) << fro.str()
+                             << std::setw(12) << 
k->second->journalFilePtr_->getEfpIdentity()
                              << std::endl;
+            totalRecordCount += 
k->second->journalFilePtr_->getEnqueuedRecordCount();
         }
+        oss << indentStr << std::setw(62) << "----------" << std::endl;
+        oss << indentStr << std::setw(62) << totalRecordCount << std::endl;
         oss << indentStr << "First record offset in first file = 0x" << 
std::hex << firstRecordOffset_ <<
                 std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) 
<< " dblks)" << std::endl;
         oss << indentStr << "End offset in last file = 0x" << std::hex << 
endOffset_ << std::dec << " ("  <<
                 (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
         oss << indentStr << "Highest rid found = 0x" << std::hex << 
highestRecordId_ << std::dec << std::endl;
         oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" 
: "FALSE") << std::endl;
-        //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: 
complete report
     }
     return oss.str();
 }
@@ -942,7 +913,7 @@ bool RecoveryManager::readJournalFileHea
 void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
     while 
(fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 
&& fileNumberMap_.size() > 1) {
         RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
-        
emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+        
emptyFilePoolPtr->returnEmptyFileSymlink(rfdp->journalFilePtr_->getFqFileName());
         delete rfdp->journalFilePtr_;
         delete rfdp;
         fileNumberMap_.erase(fileNumberMap_.begin()->first);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h Tue Nov  
4 14:58:47 2014
@@ -125,8 +125,7 @@ public:
     void recoveryComplete();
     void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
                                          LinearFileController* lfcPtr);
-    std::string toString(const std::string& jid);
-    std::string toLog(const std::string& jid, const int indent);
+    std::string toString(const std::string& jid, const uint16_t indent) const;
 protected:
     void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
     void checkFileStreamOk(bool checkEof);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp Tue Nov  4 
14:58:47 2014
@@ -119,7 +119,7 @@ jcntl::recover(EmptyFilePoolManager* efp
     assert(_emptyFilePoolPtr != 0);
 
     highest_rid = _recoveryManager.getHighestRecordId();
-    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, 
_recoveryManager.toLog(_jid, 5));
+    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, 
_recoveryManager.toString(_jid, 5U));
     _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, 
_recoveryManager.getHighestFileNumber());
     
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile,
 &_linearFileController);
     if (_recoveryManager.isLastFileFull()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Tue Nov  4 
14:58:47 2014
@@ -112,6 +112,7 @@ const uint32_t jerrno::JERR_EFP_BADPARTI
 const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME    = 0x0d03;
 const uint32_t jerrno::JERR_EFP_NOEFP            = 0x0d04;
 const uint32_t jerrno::JERR_EFP_EMPTY            = 0x0d05;
+const uint32_t jerrno::JERR_EFP_SYMLINK          = 0x0d06;
 
 // Negative returns for some functions
 const int32_t jerrno::AIO_TIMEOUT                = -1;
@@ -206,6 +207,7 @@ jerrno::__init()
     _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid 
partition directory";
     _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for 
given partition and empty file size";
     _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+    _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation 
failed";
 
     //_err_map[] = "";
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h Tue Nov  4 
14:58:47 2014
@@ -130,6 +130,7 @@ namespace journal {
         static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition 
directory
         static const uint32_t JERR_EFP_NOEFP;           ///< No EFP found for 
given partition and file size
         static const uint32_t JERR_EFP_EMPTY;           ///< EFP empty
+        static const uint32_t JERR_EFP_SYMLINK;         ///< Symbolic Link 
operation failed
 
         // Negative returns for some functions
         static const int32_t AIO_TIMEOUT;               ///< Timeout waiting 
for AIO return



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to