When active ntfd receives reader API requests: ReaderIntialize, ReadNext, ReadFinalize, active ntfd does not update the readers information to the standby. Thus, either switchover or failover happens, the client can not continue to use the reader APIs, because there is no such reader information still available in the new active after switchover.
The patch does checkpoint reader information to the standby when completes processing reader APIs request. --- src/ntf/agent/ntfa_mds.c | 51 +++------------ src/ntf/common/ntfsv_enc_dec.c | 89 ++++++++++++++++++++++---- src/ntf/common/ntfsv_enc_dec.h | 12 ++-- src/ntf/ntfd/NtfAdmin.cc | 82 ++++++++++++++++-------- src/ntf/ntfd/NtfAdmin.h | 12 ++-- src/ntf/ntfd/NtfClient.cc | 61 +++++++++++++----- src/ntf/ntfd/NtfClient.h | 11 ++-- src/ntf/ntfd/NtfLogger.cc | 2 +- src/ntf/ntfd/NtfReader.cc | 66 ++++++++++--------- src/ntf/ntfd/NtfReader.h | 12 +++- src/ntf/ntfd/ntfs_com.c | 79 +++++++++++++++++++++++ src/ntf/ntfd/ntfs_com.h | 17 +++-- src/ntf/ntfd/ntfs_evt.c | 14 ++--- src/ntf/ntfd/ntfs_mbcsv.c | 140 ++++++++++++++++++++++++++++++++++++++++- src/ntf/ntfd/ntfs_mbcsv.h | 16 +++++ src/ntf/ntfd/ntfs_mds.c | 42 +++---------- 16 files changed, 516 insertions(+), 190 deletions(-) diff --git a/src/ntf/agent/ntfa_mds.c b/src/ntf/agent/ntfa_mds.c index 41d58e8..0b088ea 100644 --- a/src/ntf/agent/ntfa_mds.c +++ b/src/ntf/agent/ntfa_mds.c @@ -162,7 +162,8 @@ static uint32_t ntfa_enc_send_not_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) ******************************************************************************/ static uint32_t ntfa_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - return ntfsv_enc_reader_initialize_msg(uba, msg); + return ntfsv_enc_reader_initialize_msg(uba, + &msg->info.api_info.param.reader_init); } /**************************************************************************** @@ -178,10 +179,11 @@ static uint32_t ntfa_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) Notes : None. ******************************************************************************/ -static uint32_t ntfa_enc_reader_initialize_msg_2(NCS_UBAID *uba, +static uint32_t ntfa_enc_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - return ntfsv_enc_reader_initialize_msg_2(uba, msg); + return ntfsv_enc_reader_initialize_2_msg(uba, + &msg->info.api_info.param.reader_init_2); } /**************************************************************************** @@ -198,25 +200,8 @@ static uint32_t ntfa_enc_reader_initialize_msg_2(NCS_UBAID *uba, ******************************************************************************/ static uint32_t ntfa_enc_reader_finalize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - uint8_t *p8; - ntfsv_reader_finalize_req_t *param = - &msg->info.api_info.param.reader_finalize; - - TRACE_ENTER(); - osafassert(uba != NULL); - - /** encode the contents **/ - p8 = ncs_enc_reserve_space(uba, 8); - if (!p8) { - TRACE("NULL pointer"); - return NCSCC_RC_OUT_OF_MEM; - } - ncs_encode_32bit(&p8, param->client_id); - ncs_encode_32bit(&p8, param->readerId); - ncs_enc_claim_space(uba, 8); - - TRACE_LEAVE(); - return NCSCC_RC_SUCCESS; + return ntfsv_enc_read_finalize_msg(uba, + &msg->info.api_info.param.reader_finalize); } /**************************************************************************** @@ -233,25 +218,7 @@ static uint32_t ntfa_enc_reader_finalize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) ******************************************************************************/ static uint32_t ntfa_enc_read_next_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - uint8_t *p8; - ntfsv_read_next_req_t *param = &msg->info.api_info.param.read_next; - - TRACE_ENTER(); - osafassert(uba != NULL); - - /** encode the contents **/ - p8 = ncs_enc_reserve_space(uba, 10); - if (!p8) { - TRACE("NULL pointer"); - return NCSCC_RC_OUT_OF_MEM; - } - ncs_encode_32bit(&p8, param->client_id); - ncs_encode_8bit(&p8, param->searchDirection); - ncs_encode_32bit(&p8, param->readerId); - ncs_enc_claim_space(uba, 10); - - TRACE_LEAVE(); - return NCSCC_RC_SUCCESS; + return ntfsv_enc_read_next_msg(uba, &msg->info.api_info.param.read_next); } /**************************************************************************** @@ -576,7 +543,7 @@ static uint32_t ntfa_mds_enc(struct ncsmds_callback_info *info) break; case NTFSV_READER_INITIALIZE_REQ_2: - rc = ntfa_enc_reader_initialize_msg_2(uba, msg); + rc = ntfa_enc_reader_initialize_2_msg(uba, msg); break; case NTFSV_READER_FINALIZE_REQ: diff --git a/src/ntf/common/ntfsv_enc_dec.c b/src/ntf/common/ntfsv_enc_dec.c index 061b7e2..d38c401 100644 --- a/src/ntf/common/ntfsv_enc_dec.c +++ b/src/ntf/common/ntfsv_enc_dec.c @@ -2165,10 +2165,9 @@ uint32_t ntfsv_dec_unsubscribe_msg(NCS_UBAID *uba, return rv; } -uint32_t ntfsv_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) +uint32_t ntfsv_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_reader_init_req_t *param) { uint8_t *p8; - ntfsv_reader_init_req_t *param = &msg->info.api_info.param.reader_init; TRACE_ENTER(); osafassert(uba != NULL); @@ -2189,10 +2188,9 @@ uint32_t ntfsv_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) return NCSCC_RC_SUCCESS; } -uint32_t ntfsv_dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) +uint32_t ntfsv_dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_reader_init_req_t *param) { uint8_t *p8; - ntfsv_reader_init_req_t *param = &msg->info.api_info.param.reader_init; uint8_t local_data[22]; /* releaseCode, majorVersion, minorVersion */ @@ -2207,22 +2205,91 @@ uint32_t ntfsv_dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) return NCSCC_RC_SUCCESS; } -uint32_t ntfsv_enc_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg) +uint32_t ntfsv_enc_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_reader_init_req_2_t *param) { uint32_t rv; - rv = ntfsv_enc_reader_initialize_msg(uba, msg); + rv = ntfsv_enc_reader_initialize_msg(uba, ¶m->head); if (rv != NCSCC_RC_SUCCESS) return rv; return ntfsv_enc_filter_msg( - uba, &msg->info.api_info.param.reader_init_2.f_rec); + uba, ¶m->f_rec); } -uint32_t ntfsv_dec_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg) +uint32_t ntfsv_dec_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_reader_init_req_2_t *param) { uint32_t rv; - rv = ntfsv_dec_reader_initialize_msg(uba, msg); + rv = ntfsv_dec_reader_initialize_msg(uba, ¶m->head); if (rv != NCSCC_RC_SUCCESS) return rv; - return ntfsv_dec_filter_msg( - uba, &msg->info.api_info.param.reader_init_2.f_rec); + return ntfsv_dec_filter_msg(uba, ¶m->f_rec); } + +uint32_t ntfsv_enc_read_next_msg(NCS_UBAID *uba, ntfsv_read_next_req_t *param) +{ + uint8_t *p8; + TRACE_ENTER(); + osafassert(uba != NULL); + + /** encode the contents **/ + p8 = ncs_enc_reserve_space(uba, 10); + if (!p8) { + TRACE("NULL pointer"); + return NCSCC_RC_OUT_OF_MEM; + } + ncs_encode_32bit(&p8, param->client_id); + ncs_encode_8bit(&p8, param->searchDirection); + ncs_encode_32bit(&p8, param->readerId); + ncs_enc_claim_space(uba, 10); + + TRACE_LEAVE(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ntfsv_dec_read_next_msg(NCS_UBAID *uba, ntfsv_read_next_req_t *param) +{ + uint8_t *p8; + uint8_t local_data[10]; + + p8 = ncs_dec_flatten_space(uba, local_data, 10); + param->client_id = ncs_decode_32bit(&p8); + param->searchDirection = ncs_decode_8bit(&p8); + param->readerId = ncs_decode_32bit(&p8); + ncs_dec_skip_space(uba, 10); + + return NCSCC_RC_SUCCESS; +} + +uint32_t ntfsv_enc_read_finalize_msg(NCS_UBAID *uba, ntfsv_reader_finalize_req_t *param) +{ + uint8_t *p8; + + TRACE_ENTER(); + osafassert(uba != NULL); + + /** encode the contents **/ + p8 = ncs_enc_reserve_space(uba, 8); + if (!p8) { + TRACE("NULL pointer"); + return NCSCC_RC_OUT_OF_MEM; + } + ncs_encode_32bit(&p8, param->client_id); + ncs_encode_32bit(&p8, param->readerId); + ncs_enc_claim_space(uba, 8); + + TRACE_LEAVE(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ntfsv_dec_read_finalize_msg(NCS_UBAID *uba, ntfsv_reader_finalize_req_t *param) +{ + uint8_t *p8; + uint8_t local_data[8]; + + p8 = ncs_dec_flatten_space(uba, local_data, 8); + param->client_id = ncs_decode_32bit(&p8); + param->readerId = ncs_decode_32bit(&p8); + ncs_dec_skip_space(uba, 8); + + return NCSCC_RC_SUCCESS; +} + diff --git a/src/ntf/common/ntfsv_enc_dec.h b/src/ntf/common/ntfsv_enc_dec.h index 0f9c4fd..9748b96 100644 --- a/src/ntf/common/ntfsv_enc_dec.h +++ b/src/ntf/common/ntfsv_enc_dec.h @@ -42,10 +42,14 @@ uint32_t ntfsv_enc_unsubscribe_msg(NCS_UBAID *uba, ntfsv_unsubscribe_req_t *param); uint32_t ntfsv_dec_unsubscribe_msg(NCS_UBAID *uba, ntfsv_unsubscribe_req_t *param); -uint32_t ntfsv_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg); -uint32_t ntfsv_dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg); -uint32_t ntfsv_enc_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg); -uint32_t ntfsv_dec_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg); +uint32_t ntfsv_enc_reader_initialize_msg(NCS_UBAID *uba, ntfsv_reader_init_req_t *param); +uint32_t ntfsv_dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_reader_init_req_t *param); +uint32_t ntfsv_enc_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_reader_init_req_2_t *param); +uint32_t ntfsv_dec_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_reader_init_req_2_t *param); +uint32_t ntfsv_enc_read_next_msg(NCS_UBAID *uba, ntfsv_read_next_req_t *param); +uint32_t ntfsv_dec_read_next_msg(NCS_UBAID *uba, ntfsv_read_next_req_t *param); +uint32_t ntfsv_enc_read_finalize_msg(NCS_UBAID *uba, ntfsv_reader_finalize_req_t *param); +uint32_t ntfsv_dec_read_finalize_msg(NCS_UBAID *uba, ntfsv_reader_finalize_req_t *param); uint32_t ntfsv_enc_64bit_msg(NCS_UBAID *uba, uint64_t param); uint32_t ntfsv_dec_64bit_msg(NCS_UBAID *uba, uint64_t *param); uint32_t ntfsv_enc_32bit_msg(NCS_UBAID *uba, uint32_t param); diff --git a/src/ntf/ntfd/NtfAdmin.cc b/src/ntf/ntfd/NtfAdmin.cc index 84c7e26..1242129 100644 --- a/src/ntf/ntfd/NtfAdmin.cc +++ b/src/ntf/ntfd/NtfAdmin.cc @@ -743,29 +743,58 @@ NtfClient *NtfAdmin::getClient(unsigned int clientId) { } /** - * Find the client and call method newReader + * The method create a new instance of NtfReader that + * has NO filter * - * @param clientId - * @param searchCriteria - * @param f_rec - filter record + * @param rp: the original reader initialize request * @param mdsCtxt + * @return pointer of new instance NtfReader */ -void NtfAdmin::newReader(unsigned int clientId, - SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t *f_rec, - MDS_SYNC_SND_CTXT *mdsCtxt) { +NtfReader* NtfAdmin::createReaderWithoutFilter(ntfsv_reader_init_req_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt) { + + TRACE_ENTER(); + NtfReader* newReader = nullptr; + unsigned int clientId = rp.client_id; + ClientMap::iterator pos; + pos = clientMap.find(clientId); + if (pos != clientMap.end()) { + // we have got the client + NtfClient *client = pos->second; + newReader = client->createReaderWithoutFilter(rp, mdsCtxt); + } else { + // client object does not exist + LOG_WA("NtfAdmin::newReader client not found %u", clientId); + } + TRACE_LEAVE(); + return newReader; +} +/** + * The method create a new instance of NtfReader that + * has filter + * + * @param rp: the original reader initialize request version 2 + * @param mdsCtxt + * @return pointer of new instance NtfReader + */ +NtfReader* NtfAdmin::createReaderWithFilter(ntfsv_reader_init_req_2_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt) { + TRACE_ENTER(); + NtfReader* newReader = nullptr; + unsigned int clientId = rp.head.client_id; ClientMap::iterator pos; pos = clientMap.find(clientId); if (pos != clientMap.end()) { // we have got the client NtfClient *client = pos->second; - client->newReader(searchCriteria, f_rec, mdsCtxt); + newReader = client->createReaderWithFilter(rp, mdsCtxt); } else { // client object does not exist LOG_WA("NtfAdmin::newReader client not found %u", clientId); } TRACE_LEAVE(); + return newReader; } /** @@ -775,17 +804,17 @@ void NtfAdmin::newReader(unsigned int clientId, * @param readerId unique readerId for this client * @param mdsCtxt */ -void NtfAdmin::readNext(unsigned int clientId, unsigned int readerId, - SaNtfSearchDirectionT searchDirection, +void NtfAdmin::readNext(ntfsv_read_next_req_t readNextReq, MDS_SYNC_SND_CTXT *mdsCtxt) { TRACE_ENTER(); - TRACE_6("clientId %u, readerId %u", clientId, readerId); + unsigned int clientId = readNextReq.client_id; + TRACE_6("clientId %u", clientId); ClientMap::iterator pos; pos = clientMap.find(clientId); if (pos != clientMap.end()) { // we have got the client NtfClient *client = pos->second; - client->readNext(readerId, searchDirection, mdsCtxt); + client->readNext(readNextReq, mdsCtxt); } else { // client object does not exist LOG_WA("NtfAdmin::readNext client not found %u", clientId); @@ -800,18 +829,18 @@ void NtfAdmin::readNext(unsigned int clientId, unsigned int readerId, * @param readerId unique readerId for this client * @param mdsCtxt */ -void NtfAdmin::deleteReader(unsigned int clientId, unsigned int readerId, +void NtfAdmin::deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, MDS_SYNC_SND_CTXT *mdsCtxt) { TRACE_ENTER(); ClientMap::iterator pos; - pos = clientMap.find(clientId); + pos = clientMap.find(readFinalizeReq.client_id); if (pos != clientMap.end()) { // we have got the client NtfClient *client = pos->second; - client->deleteReader(readerId, mdsCtxt); + client->deleteReader(readFinalizeReq, mdsCtxt); } else { // client object does not exist - LOG_WA("NtfAdmin::deleteReader client not found %u", clientId); + LOG_WA("NtfAdmin::deleteReader client not found %u", readFinalizeReq.client_id); } TRACE_LEAVE(); } @@ -1105,24 +1134,27 @@ void syncGlobals(const struct NtfGlobals *ntfGlobals) { NtfAdmin::theNtfAdmin->syncGlobals(*ntfGlobals); } -void newReader(unsigned int clientId, SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t *f_rec, MDS_SYNC_SND_CTXT *mdsCtxt) { +void createReaderWithoutFilter(ntfsv_reader_init_req_t rp, MDS_SYNC_SND_CTXT *mdsCtxt) { + osafassert(NtfAdmin::theNtfAdmin != NULL); + NtfAdmin::theNtfAdmin->createReaderWithoutFilter(rp, mdsCtxt); +} + +void createReaderWithFilter(ntfsv_reader_init_req_2_t rp, MDS_SYNC_SND_CTXT *mdsCtxt) { osafassert(NtfAdmin::theNtfAdmin != NULL); - NtfAdmin::theNtfAdmin->newReader(clientId, searchCriteria, f_rec, mdsCtxt); + NtfAdmin::theNtfAdmin->createReaderWithFilter(rp, mdsCtxt); } -void readNext(unsigned int clientId, unsigned int readerId, - SaNtfSearchDirectionT searchDirection, +void readNext(ntfsv_read_next_req_t reqNextReq, MDS_SYNC_SND_CTXT *mdsCtxt) { osafassert(NtfAdmin::theNtfAdmin != NULL); - return NtfAdmin::theNtfAdmin->readNext(clientId, readerId, searchDirection, + return NtfAdmin::theNtfAdmin->readNext(reqNextReq, mdsCtxt); } -void deleteReader(unsigned int clientId, unsigned int readerId, +void deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, MDS_SYNC_SND_CTXT *mdsCtxt) { osafassert(NtfAdmin::theNtfAdmin != NULL); - return NtfAdmin::theNtfAdmin->deleteReader(clientId, readerId, mdsCtxt); + return NtfAdmin::theNtfAdmin->deleteReader(readFinalizeReq, mdsCtxt); } void printAdminInfo() { diff --git a/src/ntf/ntfd/NtfAdmin.h b/src/ntf/ntfd/NtfAdmin.h index c3a2da1..e569ec0 100644 --- a/src/ntf/ntfd/NtfAdmin.h +++ b/src/ntf/ntfd/NtfAdmin.h @@ -72,14 +72,12 @@ class NtfAdmin { MDS_SYNC_SND_CTXT *mdsCtxt); void syncRequest(NCS_UBAID *uba); void syncGlobals(const struct NtfGlobals &ntfGlobals); - void newReader(unsigned int clientId, SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t *f_rec, MDS_SYNC_SND_CTXT *mdsCtxt); - - void readNext(unsigned int connectionId, unsigned int readerId, - SaNtfSearchDirectionT searchDirection, - MDS_SYNC_SND_CTXT *mdsCtxt); - void deleteReader(unsigned int connectionId, unsigned int readerId, + NtfReader* createReaderWithoutFilter(ntfsv_reader_init_req_t rp, MDS_SYNC_SND_CTXT *mdsCtxt); + NtfReader* createReaderWithFilter(ntfsv_reader_init_req_2_t rp, MDS_SYNC_SND_CTXT *mdsCtxt); + void deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, MDS_SYNC_SND_CTXT *mdsCtxt); + void readNext(ntfsv_read_next_req_t readNextReq, + MDS_SYNC_SND_CTXT *mdsCtxt); void printInfo(); void storeMatchingSubscription(SaNtfIdentifierT notificationId, diff --git a/src/ntf/ntfd/NtfClient.cc b/src/ntf/ntfd/NtfClient.cc index 7e8a926..4cd511f 100644 --- a/src/ntf/ntfd/NtfClient.cc +++ b/src/ntf/ntfd/NtfClient.cc @@ -352,26 +352,43 @@ void NtfClient::deleteReaderResponse(SaAisErrorT* error, delete_reader_res_lib(*error, mdsDest_, mdsCtxt); } -void NtfClient::newReader(SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t* f_rec, - MDS_SYNC_SND_CTXT* mdsCtxt) { +NtfReader* NtfClient::createReaderWithoutFilter(ntfsv_reader_init_req_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt) { + SaAisErrorT error = SA_AIS_OK; readerId_++; NtfReader* reader; - if (f_rec) { - reader = new NtfReader(NtfAdmin::theNtfAdmin->logger, readerId_, - searchCriteria, f_rec); - } else { /*old API no filtering */ - reader = new NtfReader(NtfAdmin::theNtfAdmin->logger, readerId_); + + reader = new NtfReader(NtfAdmin::theNtfAdmin->logger, readerId_, &rp); + + readerMap[readerId_] = reader; + if (activeController()) { + sendReaderInitializeUpdate(&rp); + newReaderResponse(&error, readerId_, mdsCtxt); } + return reader; +} + +NtfReader* NtfClient::createReaderWithFilter(ntfsv_reader_init_req_2_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt) { + SaAisErrorT error = SA_AIS_OK; + readerId_++; + NtfReader* reader; + + reader = new NtfReader(NtfAdmin::theNtfAdmin->logger, readerId_, &rp); + readerMap[readerId_] = reader; - newReaderResponse(&error, readerId_, mdsCtxt); + if (activeController()) { + sendReaderInitialize2Update(&rp); + newReaderResponse(&error, readerId_, mdsCtxt); + } + return reader; } -void NtfClient::readNext(unsigned int readerId, - SaNtfSearchDirectionT searchDirection, +void NtfClient::readNext(ntfsv_read_next_req_t readNextReq, MDS_SYNC_SND_CTXT* mdsCtxt) { TRACE_ENTER(); + unsigned int readerId = readNextReq.readerId; TRACE_6("readerId %u", readerId); // check if reader already exists SaAisErrorT error = SA_AIS_ERR_NOT_EXIST; @@ -381,8 +398,12 @@ void NtfClient::readNext(unsigned int readerId, // reader found TRACE_3("NtfClient::readNext readerId %u FOUND!", readerId); NtfReader* reader = pos->second; - NtfSmartPtr notif(reader->next(searchDirection, &error)); - readNextResponse(&error, notif, mdsCtxt); + NtfSmartPtr notif(reader->next(readNextReq.searchDirection, &error)); + if (activeController()) { + // update standby + sendReadNextUpdate(&readNextReq); + readNextResponse(&error, notif, mdsCtxt); + } TRACE_LEAVE(); return; } else { @@ -390,14 +411,17 @@ void NtfClient::readNext(unsigned int readerId, // reader not found TRACE_3("NtfClient::readNext readerId %u not found", readerId); error = SA_AIS_ERR_BAD_HANDLE; - readNextResponse(&error, notif, mdsCtxt); + if (activeController()) { + readNextResponse(&error, notif, mdsCtxt); + } TRACE_LEAVE(); } } -void NtfClient::deleteReader(unsigned int readerId, +void NtfClient::deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, MDS_SYNC_SND_CTXT* mdsCtxt) { SaAisErrorT error = SA_AIS_ERR_NOT_EXIST; ReaderMapT::iterator pos; + unsigned int readerId = readFinalizeReq.readerId; pos = readerMap.find(readerId); if (pos != readerMap.end()) { // reader found @@ -406,11 +430,16 @@ void NtfClient::deleteReader(unsigned int readerId, error = SA_AIS_OK; delete reader; readerMap.erase(pos); + if (activeController()) { + sendReadFinalizeUpdate(&readFinalizeReq); + } } else { // reader not found TRACE_3("NtfClient::readNext readerId %u not found", readerId); } - deleteReaderResponse(&error, mdsCtxt); + if (activeController()) { + deleteReaderResponse(&error, mdsCtxt); + } } void NtfClient::printInfo() { diff --git a/src/ntf/ntfd/NtfClient.h b/src/ntf/ntfd/NtfClient.h index e975d43..8792124 100644 --- a/src/ntf/ntfd/NtfClient.h +++ b/src/ntf/ntfd/NtfClient.h @@ -51,12 +51,15 @@ class NtfClient { void confirmNtfUnsubscribe(SaNtfSubscriptionIdT subscriptionId, MDS_SYNC_SND_CTXT *mdsCtxt); - void newReader(SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t *f_rec, MDS_SYNC_SND_CTXT *mdsCtxt); + NtfReader* createReaderWithoutFilter(ntfsv_reader_init_req_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt); + NtfReader* createReaderWithFilter(ntfsv_reader_init_req_2_t rp, + MDS_SYNC_SND_CTXT *mdsCtxt); - void readNext(unsigned int readerId, SaNtfSearchDirectionT searchDirection, + void readNext(ntfsv_read_next_req_t readNextReq, MDS_SYNC_SND_CTXT *mdsCtxt); - void deleteReader(unsigned int readerId, MDS_SYNC_SND_CTXT *mdsCtxt); + void deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, + MDS_SYNC_SND_CTXT *mdsCtxt); void discardedAdd(SaNtfSubscriptionIdT subscriptionId, SaNtfIdentifierT notificationId); diff --git a/src/ntf/ntfd/NtfLogger.cc b/src/ntf/ntfd/NtfLogger.cc index 965693b..fd17c58 100644 --- a/src/ntf/ntfd/NtfLogger.cc +++ b/src/ntf/ntfd/NtfLogger.cc @@ -347,7 +347,7 @@ void NtfLogger::syncRequest(NCS_UBAID *uba) { TRACE_2("logger.coll_.size=%u", (unsigned int)coll_.size()); - sendNoOfNotifications(coll_.size(), uba); + sendNoOfCachedNotifications(coll_.size(), uba); readerNotificationListT::iterator rpos; for (rpos = coll_.begin(); rpos != coll_.end(); rpos++) { NtfSmartPtr n(*rpos); diff --git a/src/ntf/ntfd/NtfReader.cc b/src/ntf/ntfd/NtfReader.cc index e4b98cd..5f2f2c7 100644 --- a/src/ntf/ntfd/NtfReader.cc +++ b/src/ntf/ntfd/NtfReader.cc @@ -53,12 +53,14 @@ * @param readerId * */ -NtfReader::NtfReader(NtfLogger& ntfLogger, unsigned int readerId) +NtfReader::NtfReader(NtfLogger& ntfLogger, unsigned int readerId, + ntfsv_reader_init_req_t *req) : coll_(ntfLogger.coll_), ffIter(coll_.begin()), readerId_(readerId), c_filter_(0), - firstRead(true) { + firstRead_(true), + read_init_req_(*req){ searchCriteria_.eventTime = 0; searchCriteria_.notificationId = 0; searchCriteria_.searchMode = SA_NTF_SEARCH_NOTIFICATION_ID; @@ -79,20 +81,20 @@ NtfReader::NtfReader(NtfLogger& ntfLogger, unsigned int readerId) * @param firstRead */ NtfReader::NtfReader(NtfLogger& ntfLogger, unsigned int readerId, - SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t* f_rec) + ntfsv_reader_init_req_2_t *req) : readerId_(readerId), - searchCriteria_(searchCriteria), - c_filter_(NtfCriteriaFilter::getCriteriaFilter(searchCriteria, this)), - firstRead(true) { + firstRead_(true), + read_init_2_req_(*req){ TRACE_3("New NtfReader with filter, ntfLogger.coll_.size: %u", (unsigned int)ntfLogger.coll_.size()); - if (f_rec->alarm_filter) { - NtfFilter* filter = new NtfAlarmFilter(f_rec->alarm_filter); + searchCriteria_ = req->head.searchCriteria; + c_filter_ = NtfCriteriaFilter::getCriteriaFilter(searchCriteria_, this); + if (req->f_rec.alarm_filter) { + NtfFilter* filter = new NtfAlarmFilter(req->f_rec.alarm_filter); filterMap[filter->type()] = filter; } - if (f_rec->sec_al_filter) { - NtfFilter* filter = new NtfSecurityAlarmFilter(f_rec->sec_al_filter); + if (req->f_rec.sec_al_filter) { + NtfFilter* filter = new NtfSecurityAlarmFilter(req->f_rec.sec_al_filter); filterMap[filter->type()] = filter; } filterCacheList(ntfLogger); @@ -123,21 +125,27 @@ NtfReader::~NtfReader() { * @param ntfLogger */ void NtfReader::filterCacheList(NtfLogger& ntfLogger) { - readerNotificationListT::iterator rpos; - for (rpos = ntfLogger.coll_.begin(); rpos != ntfLogger.coll_.end(); rpos++) { - NtfSmartPtr n(*rpos); - bool rv = false; - FilterMap::iterator pos = filterMap.find(n->getNotificationType()); - if (pos != filterMap.end()) { - NtfFilter* filter = pos->second; - osafassert(filter); - rv = filter->checkFilter(n); - } - if (rv) { - if (!c_filter_->filter(n)) break; + TRACE_ENTER(); + if (c_filter_ != nullptr) { + readerNotificationListT::iterator rpos; + for (rpos = ntfLogger.coll_.begin(); rpos != ntfLogger.coll_.end(); rpos++) { + NtfSmartPtr n(*rpos); + bool rv = false; + FilterMap::iterator pos = filterMap.find(n->getNotificationType()); + if (pos != filterMap.end()) { + NtfFilter* filter = pos->second; + osafassert(filter); + rv = filter->checkFilter(n); + } + if (rv) { + if (!c_filter_->filter(n)) break; + } } + c_filter_->finalize(); + } else { + coll_ = ntfLogger.coll_; } - c_filter_->finalize(); + TRACE_LEAVE(); } /** @@ -164,19 +172,19 @@ NtfSmartPtr NtfReader::next(SaNtfSearchDirectionT direction, *error = SA_AIS_ERR_NOT_EXIST; TRACE_LEAVE(); NtfSmartPtr notif; - firstRead = false; + firstRead_ = false; return notif; } NtfSmartPtr notif(*ffIter); ffIter++; *error = SA_AIS_OK; TRACE_LEAVE(); - firstRead = false; + firstRead_ = false; return notif; } else // SA_NTF_SEARCH_OLDER { readerNotReverseIterT rIter(ffIter); - if (firstRead) + if (firstRead_) rIter--; else rIter++; @@ -186,14 +194,14 @@ NtfSmartPtr NtfReader::next(SaNtfSearchDirectionT direction, ffIter = rIter.base(); TRACE_LEAVE(); NtfSmartPtr notif; - firstRead = false; + firstRead_ = false; return notif; } NtfSmartPtr notif(*rIter); ffIter = rIter.base(); *error = SA_AIS_OK; TRACE_LEAVE(); - firstRead = false; + firstRead_ = false; return notif; } } diff --git a/src/ntf/ntfd/NtfReader.h b/src/ntf/ntfd/NtfReader.h index a82e1cb..7ae983d 100644 --- a/src/ntf/ntfd/NtfReader.h +++ b/src/ntf/ntfd/NtfReader.h @@ -48,13 +48,17 @@ class NtfReader { friend class NtfCriteriaFilter; public: - NtfReader(NtfLogger& ntfLogger, unsigned int readerId); NtfReader(NtfLogger& ntfLogger, unsigned int readerId, - SaNtfSearchCriteriaT searchCriteria, ntfsv_filter_ptrs_t* f_rec); + ntfsv_reader_init_req_t *req); + NtfReader(NtfLogger& ntfLogger, unsigned int readerId, + ntfsv_reader_init_req_2_t *req); ~NtfReader(); void filterCacheList(NtfLogger& ntfLogger); NtfSmartPtr next(SaNtfSearchDirectionT direction, SaAisErrorT* error); unsigned int getId(); + void setReaderId(unsigned int readerId) { readerId_ = readerId; } + void setReaderIteration(unsigned int iterPos) {ffIter = coll_.begin() + iterPos;} + void setFirstRead(bool firstRead) {firstRead_ = firstRead; } private: readerNotificationListT coll_; @@ -63,7 +67,9 @@ class NtfReader { unsigned int readerId_; SaNtfSearchCriteriaT searchCriteria_; NtfCriteriaFilter* c_filter_; - bool firstRead; + bool firstRead_; + ntfsv_reader_init_req_t read_init_req_; + ntfsv_reader_init_req_2_t read_init_2_req_; }; class NtfCriteriaFilter { diff --git a/src/ntf/ntfd/ntfs_com.c b/src/ntf/ntfd/ntfs_com.c index 544486d..bfef875 100644 --- a/src/ntf/ntfd/ntfs_com.c +++ b/src/ntf/ntfd/ntfs_com.c @@ -23,6 +23,7 @@ */ #include <saAis.h> #include <saNtf.h> +#include "base/ncsencdec_pub.h" #include "ntf/common/ntfsv_msg.h" #include "ntfs_com.h" #include "ntf/common/ntfsv_enc_dec.h" @@ -451,12 +452,30 @@ int sendNoOfNotifications(uint32_t num_rec, NCS_UBAID *uba) return enc_ckpt_reserv_header(uba, NTFS_CKPT_NOTIFICATION, num_rec, 0); } +int sendNoOfCachedNotifications(uint32_t num_rec, NCS_UBAID *uba) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + TRACE_2("num_rec: %u", num_rec); + return enc_ckpt_reserv_header(uba, NTFS_CKPT_NOTIFICATION, num_rec, 0); + } + return NCSCC_RC_SUCCESS; +} + int sendNoOfSubscriptions(uint32_t num_rec, NCS_UBAID *uba) { TRACE_2("num_rec: %u", num_rec); return enc_ckpt_reserv_header(uba, NTFS_CKPT_SUBSCRIBE, num_rec, 0); } +int sendNoOfReaders(uint32_t num_rec, NCS_UBAID *uba) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + TRACE_2("num_rec: %u", num_rec); + return enc_ckpt_reserv_header(uba, NTFS_CKPT_READER_INITIALIZE_2, num_rec, 0); + } + return NCSCC_RC_SUCCESS; +} + int sendNewSubscription(ntfsv_subscribe_req_t *s, NCS_UBAID *uba) { TRACE_2("numdiscarded: %u", s->d_info.numberDiscarded); @@ -570,6 +589,66 @@ void sendNotConfirmUpdate(unsigned int clientId, TRACE_LEAVE(); } +void sendReaderInitialize2Update(ntfsv_reader_init_req_2_t* readerInitializeReq) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + + ntfsv_ckpt_msg_t ckpt; + + memset(&ckpt, 0, sizeof(ckpt)); + ckpt.header.ckpt_rec_type = NTFS_CKPT_READER_INITIALIZE_2; + ckpt.header.num_ckpt_records = 1; + ckpt.header.data_len = 1; + ckpt.ckpt_rec.reader_init_2.arg = *readerInitializeReq; + update_standby(&ckpt, NCS_MBCSV_ACT_ADD); + } +} + +void sendReadNextUpdate(ntfsv_read_next_req_t* readNextReq) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + + ntfsv_ckpt_msg_t ckpt; + + memset(&ckpt, 0, sizeof(ckpt)); + ckpt.header.ckpt_rec_type = NTFS_CKPT_READ_NEXT; + ckpt.header.num_ckpt_records = 1; + ckpt.header.data_len = 1; + ckpt.ckpt_rec.read_next.arg = *readNextReq; + update_standby(&ckpt, NCS_MBCSV_ACT_ADD); + } +} + +void sendReadFinalizeUpdate(ntfsv_reader_finalize_req_t* readFinalizeReq) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + + ntfsv_ckpt_msg_t ckpt; + + memset(&ckpt, 0, sizeof(ckpt)); + ckpt.header.ckpt_rec_type = NTFS_CKPT_READ_FINALIZE; + ckpt.header.num_ckpt_records = 1; + ckpt.header.data_len = 1; + ckpt.ckpt_rec.read_finalize.arg = *readFinalizeReq; + update_standby(&ckpt, NCS_MBCSV_ACT_ADD); + } +} + +void sendReaderInitializeUpdate(ntfsv_reader_init_req_t* readerInitializeReq) +{ + if (ntfs_cb->peer_mbcsv_version > NTFS_MBCSV_VERSION_2) { + + ntfsv_ckpt_msg_t ckpt; + + memset(&ckpt, 0, sizeof(ckpt)); + ckpt.header.ckpt_rec_type = NTFS_CKPT_READER_INITIALIZE; + ckpt.header.num_ckpt_records = 1; + ckpt.header.data_len = 1; + ckpt.ckpt_rec.reader_init.arg = *readerInitializeReq; + update_standby(&ckpt, NCS_MBCSV_ACT_ADD); + } +} + /** * @brief Send Membership status of node to a lib on that node. * diff --git a/src/ntf/ntfd/ntfs_com.h b/src/ntf/ntfd/ntfs_com.h index 4e8155e..1c1b283 100644 --- a/src/ntf/ntfd/ntfs_com.h +++ b/src/ntf/ntfd/ntfs_com.h @@ -84,13 +84,17 @@ void subscriptionRemoved(unsigned int clientId, MDS_SYNC_SND_CTXT *mdsCtxt); void syncRequest(NCS_UBAID *uba); int syncFinished(); -void newReader(unsigned int clientId, SaNtfSearchCriteriaT searchCriteria, - ntfsv_filter_ptrs_t *f_rec, MDS_SYNC_SND_CTXT *mdsCtxt); -void readNext(unsigned int clientId, unsigned int readerId, - SaNtfSearchDirectionT searchDirection, - MDS_SYNC_SND_CTXT *mdsCtxt); -void deleteReader(unsigned int clientId, unsigned int readerId, +void createReaderWithoutFilter(ntfsv_reader_init_req_t rp, MDS_SYNC_SND_CTXT *mdsCtxt); +void createReaderWithFilter(ntfsv_reader_init_req_2_t rp, MDS_SYNC_SND_CTXT *mdsCtxt); +void deleteReader(ntfsv_reader_finalize_req_t readFinalizeReq, MDS_SYNC_SND_CTXT *mdsCtxt); +void readNext(ntfsv_read_next_req_t readNextReq, + MDS_SYNC_SND_CTXT *mdsCtxt); +void sendReaderInitialize2Update(ntfsv_reader_init_req_2_t* readerInitializeReq); +void sendReaderInitializeUpdate(ntfsv_reader_init_req_t* readerInitializeReq); +void sendReadNextUpdate(ntfsv_read_next_req_t* readNextReq); +void sendReadFinalizeUpdate(ntfsv_reader_finalize_req_t* readFinalizeReq); +int sendNoOfReaders(uint32_t num_rec, NCS_UBAID *uba); void printAdminInfo(void); void syncGlobals(const struct NtfGlobals *ntfGlobals); @@ -175,6 +179,7 @@ void sendNotConfirmUpdate(unsigned int clientId, SaNtfSubscriptionIdT subscriptionId, SaNtfIdentifierT notificationId, int discarded); int sendNoOfNotifications(uint32_t num_rec, NCS_UBAID *uba); +int sendNoOfCachedNotifications(uint32_t num_rec, NCS_UBAID *uba); int sendNoOfSubscriptions(uint32_t num_rec, NCS_UBAID *uba); int sendNoOfClients(uint32_t num_rec, NCS_UBAID *uba); diff --git a/src/ntf/ntfd/ntfs_evt.c b/src/ntf/ntfd/ntfs_evt.c index 88f58f7..19b2f60 100644 --- a/src/ntf/ntfd/ntfs_evt.c +++ b/src/ntf/ntfd/ntfs_evt.c @@ -512,9 +512,8 @@ static uint32_t proc_reader_initialize_msg(ntfs_cb_t *cb, ntfsv_ntfs_evt_t *evt) } TRACE_4("client_id: %u", reader_initialize_param->client_id); - newReader(reader_initialize_param->client_id, - reader_initialize_param->searchCriteria, NULL, - &evt->mds_ctxt); + createReaderWithoutFilter(*reader_initialize_param, &evt->mds_ctxt); + TRACE_LEAVE(); return rc; } @@ -553,8 +552,7 @@ static uint32_t proc_reader_initialize_msg_2(ntfs_cb_t *cb, return rc; } - newReader(rp->head.client_id, rp->head.searchCriteria, &rp->f_rec, - &evt->mds_ctxt); + createReaderWithFilter(*rp, &evt->mds_ctxt); TRACE_LEAVE(); return rc; } @@ -591,8 +589,7 @@ static uint32_t proc_reader_finalize_msg(ntfs_cb_t *cb, ntfsv_ntfs_evt_t *evt) } TRACE_4("client_id: %u", reader_finalize_param->client_id); - deleteReader(reader_finalize_param->client_id, - reader_finalize_param->readerId, &evt->mds_ctxt); + deleteReader(*reader_finalize_param, &evt->mds_ctxt); /* if (ais_rv == SA_AIS_OK) */ /* { */ @@ -638,8 +635,7 @@ static uint32_t proc_read_next_msg(ntfs_cb_t *cb, ntfsv_ntfs_evt_t *evt) return rc; } - readNext(read_next_param->client_id, read_next_param->readerId, - read_next_param->searchDirection, &evt->mds_ctxt); + readNext(*read_next_param, &evt->mds_ctxt); /* if (ais_rv == SA_AIS_OK) */ /* { */ /* async_rc = ntfs_subscription_initialize_async_update(cb, */ diff --git a/src/ntf/ntfd/ntfs_mbcsv.c b/src/ntf/ntfd/ntfs_mbcsv.c index 4afd5e6..cdf2aa5 100644 --- a/src/ntf/ntfd/ntfs_mbcsv.c +++ b/src/ntf/ntfd/ntfs_mbcsv.c @@ -55,6 +55,10 @@ static uint32_t ckpt_proc_not_log_confirm(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); static uint32_t ckpt_proc_not_send_confirm(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); +static uint32_t ckpt_proc_reader_init(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); +static uint32_t ckpt_proc_reader_init_2(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); +static uint32_t ckpt_proc_read_next(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); +static uint32_t ckpt_proc_read_finalize(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data); static void enc_ckpt_header(uint8_t *pdata, ntfsv_ckpt_header_t header); static uint32_t dec_ckpt_header(NCS_UBAID *uba, ntfsv_ckpt_header_t *header); @@ -95,7 +99,11 @@ static NTFS_CKPT_HDLR ckpt_data_handler[NTFS_CKPT_MSG_MAX] = { ckpt_proc_subscribe, ckpt_proc_unsubscribe, ckpt_proc_not_log_confirm, - ckpt_proc_not_send_confirm}; + ckpt_proc_not_send_confirm, + ckpt_proc_reader_init, + ckpt_proc_reader_init_2, + ckpt_proc_read_next, + ckpt_proc_read_finalize}; /**************************************************************************** * Name : ntfsv_mbcsv_init @@ -556,6 +564,10 @@ static uint32_t ckpt_encode_async_update(ntfs_cb_t *ntfs_cb, EDU_HDL edu_hdl, ntfs_ckpt_reg_msg_t ckpt_reg_rec; ntfs_ckpt_subscribe_t subscribe_rec; ntfs_ckpt_unsubscribe_t unsubscribe_rec; + ntfs_ckpt_reader_initialize_t reader_init_rec; + ntfs_ckpt_reader_initialize_2_t reader_init_rec2; + ntfs_ckpt_read_next_t read_next_rec; + ntfs_ckpt_read_finalize_t read_finalize_rec; EDU_ERR ederror; case NTFS_CKPT_INITIALIZE_REC: @@ -640,6 +652,46 @@ static uint32_t ckpt_encode_async_update(ntfs_cb_t *ntfs_cb, EDU_HDL edu_hdl, rc = enc_mbcsv_send_confirm_msg(uba, &data->ckpt_rec.send_confirm); break; + case NTFS_CKPT_READER_INITIALIZE: + TRACE("Async update NTFS_CKPT_READER_INITIALIZE"); + ckpt_hdr.ckpt_rec_type = NTFS_CKPT_READER_INITIALIZE; + ckpt_hdr.num_ckpt_records = 1; + ckpt_hdr.data_len = 0; /*Not in Use for Cold Sync */ + enc_ckpt_header(pheader, ckpt_hdr); + + reader_init_rec.arg = data->ckpt_rec.reader_init.arg; + rc = ntfsv_enc_reader_initialize_msg(uba, &reader_init_rec.arg); + break; + case NTFS_CKPT_READER_INITIALIZE_2: + TRACE("Async update NTFS_CKPT_READER_INITIALIZE_2"); + ckpt_hdr.ckpt_rec_type = NTFS_CKPT_READER_INITIALIZE_2; + ckpt_hdr.num_ckpt_records = 1; + ckpt_hdr.data_len = 0; /*Not in Use for Cold Sync */ + enc_ckpt_header(pheader, ckpt_hdr); + + reader_init_rec2.arg = data->ckpt_rec.reader_init_2.arg; + rc = ntfsv_enc_reader_initialize_2_msg(uba, &reader_init_rec2.arg); + break; + case NTFS_CKPT_READ_NEXT: + TRACE("Async update NTFS_CKPT_READ_NEXT"); + ckpt_hdr.ckpt_rec_type = NTFS_CKPT_READ_NEXT; + ckpt_hdr.num_ckpt_records = 1; + ckpt_hdr.data_len = 0; /*Not in Use for Cold Sync */ + enc_ckpt_header(pheader, ckpt_hdr); + + read_next_rec.arg = data->ckpt_rec.read_next.arg; + rc = ntfsv_enc_read_next_msg(uba, &read_next_rec.arg); + break; + case NTFS_CKPT_READ_FINALIZE: + TRACE("Async update NTFS_CKPT_READ_FINALIZE"); + ckpt_hdr.ckpt_rec_type = NTFS_CKPT_READ_FINALIZE; + ckpt_hdr.num_ckpt_records = 1; + ckpt_hdr.data_len = 0; /*Not in Use for Cold Sync */ + enc_ckpt_header(pheader, ckpt_hdr); + + read_finalize_rec.arg = data->ckpt_rec.read_finalize.arg; + rc = ntfsv_enc_read_finalize_msg(uba, &read_finalize_rec.arg); + break; default: TRACE_3("FAILED no type: %d", data->header.ckpt_rec_type); break; @@ -764,6 +816,10 @@ static uint32_t ckpt_decode_async_update(ntfs_cb_t *cb, ntfs_ckpt_subscribe_t *subscribe_rec = NULL; ntfs_ckpt_unsubscribe_t *unsubscribe_rec = NULL; ntfsv_ckpt_finalize_msg_t *finalize = NULL; + ntfs_ckpt_reader_initialize_t *reader_init_rec = NULL; + ntfs_ckpt_reader_initialize_2_t *reader_init_rec2 = NULL; + ntfs_ckpt_read_next_t *read_next = NULL; + ntfs_ckpt_read_finalize_t *read_finalize = NULL; MDS_DEST *agent_dest = NULL; TRACE_ENTER(); @@ -858,7 +914,34 @@ static uint32_t ckpt_decode_async_update(ntfs_cb_t *cb, goto done; } break; + case NTFS_CKPT_READER_INITIALIZE: + TRACE_2("READER INIT: AUPDATE"); + reader_init_rec = &ckpt_msg->ckpt_rec.reader_init; + + rc = ntfsv_dec_reader_initialize_msg(&cbk_arg->info.decode.i_uba, + &reader_init_rec->arg); + break; + case NTFS_CKPT_READER_INITIALIZE_2: + TRACE_2("READER INIT2: AUPDATE"); + reader_init_rec2 = &ckpt_msg->ckpt_rec.reader_init_2; + + rc = ntfsv_dec_reader_initialize_2_msg(&cbk_arg->info.decode.i_uba, + &reader_init_rec2->arg); + break; + case NTFS_CKPT_READ_NEXT: + TRACE_2("READ NEXT: AUPDATE"); + read_next = &ckpt_msg->ckpt_rec.read_next; + + rc = ntfsv_dec_read_next_msg(&cbk_arg->info.decode.i_uba, + &read_next->arg); + break; + case NTFS_CKPT_READ_FINALIZE: + TRACE_2("READ FINALIZE: AUPDATE"); + read_finalize = &ckpt_msg->ckpt_rec.read_finalize; + rc = ntfsv_dec_read_finalize_msg(&cbk_arg->info.decode.i_uba, + &read_finalize->arg); + break; default: rc = NCSCC_RC_FAILURE; TRACE(" FAILED"); @@ -1388,6 +1471,61 @@ uint32_t ckpt_proc_subscribe(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) return NCSCC_RC_SUCCESS; } +uint32_t ckpt_proc_reader_init(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) +{ + + TRACE_ENTER(); + + ntfsv_reader_init_req_t *rp = &data->ckpt_rec.reader_init.arg; + TRACE_4("client_id: %u", rp->client_id); + + createReaderWithoutFilter(*rp, NULL); + TRACE_LEAVE(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ckpt_proc_reader_init_2(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) +{ + + TRACE_ENTER(); + + ntfsv_reader_init_req_2_t *rp = &data->ckpt_rec.reader_init_2.arg; + TRACE_4("client_id: %u", rp->head.client_id); + + createReaderWithFilter(*rp, NULL); + TRACE_LEAVE(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ckpt_proc_read_next(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) +{ + + TRACE_ENTER(); + + ntfsv_read_next_req_t *rp = &data->ckpt_rec.read_next.arg; + TRACE_4("client_id: %u", rp->client_id); + + readNext(*rp, NULL); + TRACE_LEAVE(); + return NCSCC_RC_SUCCESS; +} + +uint32_t ckpt_proc_read_finalize(ntfs_cb_t *cb, ntfsv_ckpt_msg_t *data) +{ + uint32_t rc = NCSCC_RC_SUCCESS; + + TRACE_ENTER(); + ntfsv_reader_finalize_req_t *reader_finalize_param = + &data->ckpt_rec.read_finalize.arg; + + + TRACE_4("client_id: %u", reader_finalize_param->client_id); + deleteReader(*reader_finalize_param, NULL); + TRACE_LEAVE(); + return rc; +} + + /**************************************************************************** * Name : ckpt_proc_finalize_rec * diff --git a/src/ntf/ntfd/ntfs_mbcsv.h b/src/ntf/ntfd/ntfs_mbcsv.h index b6bb0e0..3286ec8 100644 --- a/src/ntf/ntfd/ntfs_mbcsv.h +++ b/src/ntf/ntfd/ntfs_mbcsv.h @@ -41,6 +41,10 @@ typedef enum ntfsv_ckpt_rec_type { NTFS_CKPT_UNSUBSCRIBE = 6, NTFS_CKPT_NOT_LOG_CONFIRM = 7, NTFS_CKPT_NOT_SEND_CONFIRM = 8, + NTFS_CKPT_READER_INITIALIZE = 9, + NTFS_CKPT_READER_INITIALIZE_2 = 10, + NTFS_CKPT_READ_NEXT = 11, + NTFS_CKPT_READ_FINALIZE = 12, NTFS_CKPT_MSG_MAX } ntfsv_ckpt_msg_type_t; @@ -71,6 +75,14 @@ typedef struct { ntfsv_subscribe_req_t arg; } ntfs_ckpt_subscribe_t; typedef struct { ntfsv_unsubscribe_req_t arg; } ntfs_ckpt_unsubscribe_t; +typedef struct { ntfsv_reader_init_req_t arg; } ntfs_ckpt_reader_initialize_t; + +typedef struct { ntfsv_reader_init_req_2_t arg; } ntfs_ckpt_reader_initialize_2_t; + +typedef struct { ntfsv_read_next_req_t arg; } ntfs_ckpt_read_next_t; + +typedef struct { ntfsv_reader_finalize_req_t arg; } ntfs_ckpt_read_finalize_t; + typedef struct { SaNtfIdentifierT notificationId; } ntfs_ckpt_not_log_confirm_t; typedef struct { @@ -101,6 +113,10 @@ typedef struct { ntfs_ckpt_unsubscribe_t unsubscribe; ntfs_ckpt_not_log_confirm_t log_confirm; ntfs_ckpt_not_send_confirm_t send_confirm; + ntfs_ckpt_reader_initialize_t reader_init; + ntfs_ckpt_reader_initialize_2_t reader_init_2; + ntfs_ckpt_read_next_t read_next; + ntfs_ckpt_read_finalize_t read_finalize; } ckpt_rec; } ntfsv_ckpt_msg_t; diff --git a/src/ntf/ntfd/ntfs_mds.c b/src/ntf/ntfd/ntfs_mds.c index 59b7730..a4b4a5f 100644 --- a/src/ntf/ntfd/ntfs_mds.c +++ b/src/ntf/ntfd/ntfs_mds.c @@ -181,14 +181,15 @@ static uint32_t dec_send_not_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) ******************************************************************************/ static uint32_t dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - return ntfsv_dec_reader_initialize_msg(uba, msg); + return ntfsv_dec_reader_initialize_msg(uba, + &msg->info.api_info.param.reader_init); } /**************************************************************************** - Name : dec_reader_initialize_msg_2 + Name : dec_reader_initialize_2_msg Description : This routine decodes an reader_initialize API msg - filter is added in reader_initialize_msg_2 + filter is added in reader_initialize_2_msg Arguments : NCS_UBAID *msg, NTFSV_MSG *msg @@ -197,9 +198,10 @@ static uint32_t dec_reader_initialize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) Notes : None. ******************************************************************************/ -static uint32_t dec_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg) +static uint32_t dec_reader_initialize_2_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - return ntfsv_dec_reader_initialize_msg_2(uba, msg); + return ntfsv_dec_reader_initialize_2_msg(uba, + &msg->info.api_info.param.reader_init_2); } /**************************************************************************** @@ -216,19 +218,7 @@ static uint32_t dec_reader_initialize_msg_2(NCS_UBAID *uba, ntfsv_msg_t *msg) ******************************************************************************/ static uint32_t dec_reader_finalize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - uint8_t *p8; - ntfsv_reader_finalize_req_t *param = - &msg->info.api_info.param.reader_finalize; - uint8_t local_data[8]; - - /* releaseCode, majorVersion, minorVersion */ - p8 = ncs_dec_flatten_space(uba, local_data, 8); - param->client_id = ncs_decode_32bit(&p8); - param->readerId = ncs_decode_32bit(&p8); - ncs_dec_skip_space(uba, 8); - - TRACE_8("NTFSV_reader_finalize_REQ"); - return NCSCC_RC_SUCCESS; + return ntfsv_dec_read_finalize_msg(uba, &msg->info.api_info.param.reader_finalize); } /**************************************************************************** @@ -245,19 +235,7 @@ static uint32_t dec_reader_finalize_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) ******************************************************************************/ static uint32_t dec_read_next_msg(NCS_UBAID *uba, ntfsv_msg_t *msg) { - uint8_t *p8; - ntfsv_read_next_req_t *param = &msg->info.api_info.param.read_next; - uint8_t local_data[10]; - - /* releaseCode, majorVersion, minorVersion */ - p8 = ncs_dec_flatten_space(uba, local_data, 10); - param->client_id = ncs_decode_32bit(&p8); - param->searchDirection = ncs_decode_8bit(&p8); - param->readerId = ncs_decode_32bit(&p8); - ncs_dec_skip_space(uba, 10); - - TRACE_8("NTFSV_read_next_REQ"); - return NCSCC_RC_SUCCESS; + return ntfsv_dec_read_next_msg(uba, &msg->info.api_info.param.read_next); } /**************************************************************************** @@ -749,7 +727,7 @@ static uint32_t mds_dec(struct ncsmds_callback_info *info) rc = dec_reader_initialize_msg(uba, &evt->info.msg); break; case NTFSV_READER_INITIALIZE_REQ_2: - rc = dec_reader_initialize_msg_2(uba, &evt->info.msg); + rc = dec_reader_initialize_2_msg(uba, &evt->info.msg); break; case NTFSV_READER_FINALIZE_REQ: rc = dec_reader_finalize_msg(uba, &evt->info.msg); -- 2.7.4 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel