Author: shuston Date: Wed Nov 3 23:12:08 2010 New Revision: 1030752 URL: http://svn.apache.org/viewvc?rev=1030752&view=rev Log: Catch com exceptions during db recovery and rethrow as ADOExceptions which the broker can deal with. Resolves QPID-2925.
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp?rev=1030752&r1=1030751&r2=1030752&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp Wed Nov 3 23:12:08 2010 @@ -1025,75 +1025,99 @@ MSSqlProvider::collectPreparedXids(std:: void MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsConfigs; - rsConfigs.open(db, TblConfig); - _RecordsetPtr p = (_RecordsetPtr)rsConfigs; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Config instance and reset its ID. - broker::RecoverableConfig::shared_ptr config = - recoverer.recoverConfig(blob); - config->setPersistenceId(id); - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering configs", + e, + db ? db->getErrors() : ""); + } } void MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, ExchangeMap& exchangeMap) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsExchanges; - rsExchanges.open(db, TblExchange); - _RecordsetPtr p = (_RecordsetPtr)rsExchanges; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Exchange instance, reset its ID, and remember the - // ones restored for matching up when recovering bindings. - broker::RecoverableExchange::shared_ptr exchange = - recoverer.recoverExchange(blob); - exchange->setPersistenceId(id); - exchangeMap[id] = exchange; - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering exchanges", + e, + db ? db->getErrors() : ""); + } } void MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsQueues; - rsQueues.open(db, TblQueue); - _RecordsetPtr p = (_RecordsetPtr)rsQueues; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Queue instance and reset its ID. - broker::RecoverableQueue::shared_ptr queue = - recoverer.recoverQueue(blob); - queue->setPersistenceId(id); - queueMap[id] = queue; - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering queues", + e, + db ? db->getErrors() : ""); + } } void @@ -1101,10 +1125,18 @@ MSSqlProvider::recoverBindings(qpid::bro const ExchangeMap& exchangeMap, const QueueMap& queueMap) { - DatabaseConnection *db = initConnection(); - BindingRecordset rsBindings; - rsBindings.open(db, TblBinding); - rsBindings.recover(recoverer, exchangeMap, queueMap); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap, queueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering bindings", + e, + db ? db->getErrors() : ""); + } } void @@ -1112,14 +1144,22 @@ MSSqlProvider::recoverMessages(qpid::bro MessageMap& messageMap, MessageQueueMap& messageQueueMap) { - DatabaseConnection *db = initConnection(); - MessageRecordset rsMessages; - rsMessages.open(db, TblMessage); - rsMessages.recover(recoverer, messageMap); - - MessageMapRecordset rsMessageMaps; - rsMessageMaps.open(db, TblMessageMap); - rsMessageMaps.recover(messageQueueMap); + DatabaseConnection *db = 0; + try { + db = initConnection(); + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.recover(recoverer, messageMap); + + MessageMapRecordset rsMessageMaps; + rsMessageMaps.open(db, TblMessageMap); + rsMessageMaps.recover(messageQueueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering messages", + e, + db ? db->getErrors() : ""); + } } void --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org