Repository: incubator-hawq Updated Branches: refs/heads/master dada9ba99 -> decbe0deb
HAWQ-966. Adjust libyarn output log messages. Change some log message level to DEBUG1. Property 'yarn.client.log.severity' in yarn-client.xml can be used to change the output log level, default value is 'INFO', others can be 'DEBUG1', 'WARNING', etc. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/decbe0de Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/decbe0de Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/decbe0de Branch: refs/heads/master Commit: decbe0debdd024f759b06f83fb181a67d934c75e Parents: dada9ba Author: Wen Lin <w...@pivotal.io> Authored: Mon Aug 1 10:22:25 2016 +0800 Committer: Wen Lin <w...@pivotal.io> Committed: Mon Aug 1 10:22:25 2016 +0800 ---------------------------------------------------------------------- .../src/libyarnclient/ApplicationClient.cpp | 13 +- .../src/libyarnclient/ApplicationMaster.cpp | 13 +- .../src/libyarnclient/ContainerManagement.cpp | 8 +- .../libyarn/src/libyarnclient/LibYarnClient.cpp | 965 ++++++++++--------- 4 files changed, 506 insertions(+), 493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ApplicationClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp index 64acbe9..819514f 100644 --- a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp +++ b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp @@ -50,7 +50,7 @@ std::vector<RMInfo> RMInfo::getHARMInfo(const Yarn::Config & conf, const char* n retval[i].setPort(rm[1]); } } catch (const Yarn::YarnConfigNotFound &e) { - LOG(INFO, "Yarn RM HA is not configured."); + LOG(DEBUG1, "Yarn RM HA is not configured."); } return retval; @@ -63,6 +63,7 @@ ApplicationClient::ApplicationClient(string &user, string &host, string &port) { Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig(); Yarn::Internal::SessionConfig sessionConfig(*conf); + RootLogger.setLogSeverity(sessionConfig.getLogSeverity()); LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str()); @@ -135,7 +136,7 @@ std::shared_ptr<ApplicationClientProtocol> ApplicationClient::getActiveAppClientProto(uint32_t & oldValue) { lock_guard<mutex> lock(this->mut); - LOG(INFO, "ApplicationClient::getActiveAppClientProto is called."); + LOG(DEBUG2, "ApplicationClient::getActiveAppClientProto is called."); if (appClientProtos.empty()) { LOG(WARNING, "The vector of ApplicationClientProtocol is empty."); @@ -143,7 +144,8 @@ std::shared_ptr<ApplicationClientProtocol> } oldValue = currentAppClientProto; - LOG(INFO, "ApplicationClient::getActiveAppClientProto, current is %d.", currentAppClientProto); + LOG(DEBUG1, "ApplicationClient::getActiveAppClientProto, current is %d.", + currentAppClientProto); return appClientProtos[currentAppClientProto % appClientProtos.size()]; } @@ -156,12 +158,13 @@ void ApplicationClient::failoverToNextAppClientProto(uint32_t oldValue){ ++currentAppClientProto; currentAppClientProto = currentAppClientProto % appClientProtos.size(); - LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is %d.", currentAppClientProto); + LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is %d.", + currentAppClientProto); } static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) { try { - Yarn::rethrow_if_nested(e); + Yarn::rethrow_if_nested(e); } catch (...) { NESTED_THROW(Yarn::YarnRpcException, "%s", e.what()); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp index 08d8cdb..964ac0e 100644 --- a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp +++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp @@ -86,8 +86,9 @@ ApplicationMaster::ApplicationMaster(string &schedHost, string &schedPort, std::shared_ptr<ApplicationMasterProtocol>( new ApplicationMasterProtocol(rmInfos[i].getHost(), rmInfos[i].getPort(), tokenService, sessionConfig, rpcAuth))); - LOG(INFO, "ApplicationMaster finds a candidate RM scheduler, host:%s, port:%s", - rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str()); + LOG(INFO, + "ApplicationMaster finds a candidate RM scheduler, host:%s, port:%s", + rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str()); } } currentAppMasterProto = 0; @@ -111,7 +112,8 @@ std::shared_ptr<ApplicationMasterProtocol> } oldValue = currentAppMasterProto; - LOG(INFO, "ApplicationMaster::getActiveAppMasterProto, current is %d.", currentAppMasterProto); + LOG(DEBUG2, "ApplicationMaster::getActiveAppMasterProto, current is %d.", + currentAppMasterProto); return appMasterProtos[currentAppMasterProto % appMasterProtos.size()]; } @@ -124,12 +126,13 @@ void ApplicationMaster::failoverToNextAppMasterProto(uint32_t oldValue){ ++currentAppMasterProto; currentAppMasterProto = currentAppMasterProto % appMasterProtos.size(); - LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is %d.", currentAppMasterProto); + LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is %d.", + currentAppMasterProto); } static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) { try { - Yarn::rethrow_if_nested(e); + Yarn::rethrow_if_nested(e); } catch (...) { NESTED_THROW(Yarn::YarnRpcException, "%s", e.what()); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ContainerManagement.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp index 915f518..b999988 100644 --- a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp +++ b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp @@ -67,7 +67,7 @@ StartContainerResponse ContainerManagement::startContainer(Container &container, oss << container.getNodeId().getPort(); string port(oss.str()); - LOG(INFO, + LOG(DEBUG1, "ContainerManagement::startContainer, is going to connect to NM [%s:%s] to start container", host.c_str(), port.c_str()); @@ -101,7 +101,7 @@ StartContainerResponse ContainerManagement::startContainer(Container &container, StartContainerResponse scResponse; scResponse.setServicesMetaData(scsResponse.getServicesMetaData()); - LOG(INFO, + LOG(DEBUG1, "ContainerManagement::startContainer, after start a container, id:%ld on NM [%s:%s]", container.getId().getId(), host.c_str(), port.c_str()); @@ -131,7 +131,7 @@ void ContainerManagement::stopContainer(Container &container, Token &nmToken) { oss << container.getNodeId().getPort(); string port(oss.str()); - LOG(INFO, + LOG(DEBUG1, "ContainerManagement::stopContainer, is going to connect to NM [%s:%s] to stop container", host.c_str(), port.c_str()); @@ -174,7 +174,7 @@ ContainerStatus ContainerManagement::getContainerStatus(Container &container, oss << container.getNodeId().getPort(); string port(oss.str()); - LOG(INFO, + LOG(DEBUG1, "ContainerManagement, is going to connect to NM [%s:%s] to getContainerStatus container", host.c_str(), port.c_str()); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/LibYarnClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp index 823d116..5fa91e7 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp +++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp @@ -37,132 +37,130 @@ using namespace Yarn::Internal; namespace libyarn { LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort, - string &schedHost, string &schedPort, string &amHost, - int32_t amPort, string &am_tracking_url,int heartbeatInterval) : - amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(amHost), - amPort(amPort), am_tracking_url(am_tracking_url), - heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""), - keepRun(false), needHeartbeatAlive(false){ - pthread_mutex_init( &(heartbeatLock), NULL ); - - amrmClient = NULL; - appClient = (void*) new ApplicationClient(user, rmHost, rmPort); - nmClient = (void*) new ContainerManagement(); + string &schedHost, string &schedPort, string &amHost, int32_t amPort, + string &am_tracking_url, int heartbeatInterval) : + amUser(user), schedHost(schedHost), schedPort(schedPort), amHost( + amHost), amPort(amPort), am_tracking_url(am_tracking_url), heartbeatInterval( + heartbeatInterval), response_id(0), clientJobId(""), keepRun( + false), needHeartbeatAlive(false) { + pthread_mutex_init(&(heartbeatLock), NULL); + amrmClient = NULL; + appClient = (void*) new ApplicationClient(user, rmHost, rmPort); + nmClient = (void*) new ContainerManagement(); } #ifdef MOCKTEST LibYarnClient::LibYarnClient(string &user,string &rmHost, string &rmPort, string &schedHost, - string &schedPort, string &amHost, int32_t amPort, - string &am_tracking_url, int heartbeatInterval,Mock::TestLibYarnClientStub *stub): - amUser(user),schedHost(schedHost), schedPort(schedPort), amHost(amHost), - amPort(amPort), am_tracking_url(am_tracking_url), - heartbeatInterval(heartbeatInterval),clientJobId(""), - keepRun(false), needHeartbeatAlive(false){ - pthread_mutex_init( &(heartbeatLock), NULL ); - libyarnStub = stub; - appClient = (void*) libyarnStub->getApplicationClient(); - amrmClient = (void*) libyarnStub->getApplicationMaster(); - nmClient = (void*) libyarnStub->getContainerManagement(); + string &schedPort, string &amHost, int32_t amPort, + string &am_tracking_url, int heartbeatInterval,Mock::TestLibYarnClientStub *stub): + amUser(user),schedHost(schedHost), schedPort(schedPort), amHost(amHost), + amPort(amPort), am_tracking_url(am_tracking_url), + heartbeatInterval(heartbeatInterval),clientJobId(""), + keepRun(false), needHeartbeatAlive(false) { + pthread_mutex_init( &(heartbeatLock), NULL ); + libyarnStub = stub; + appClient = (void*) libyarnStub->getApplicationClient(); + amrmClient = (void*) libyarnStub->getApplicationMaster(); + nmClient = (void*) libyarnStub->getContainerManagement(); } #endif LibYarnClient::~LibYarnClient() { #ifndef MOCKTEST - if ( keepRun ) { - // No need to run heart-beat thread now. - keepRun = false; - void *thrc = NULL; - int rc = pthread_join(heartbeatThread, &thrc); - if ( rc != 0 ) { - LOG(INFO, "LibYarnClient::~LibYarnClient, fail to join heart-beat thread. " - "error code %d", rc); - } - else { - LOG(INFO, "LibYarnClient::~LibYarnClient, join heart-beat thread successfully."); - } - } + if (keepRun) { + // No need to run heart-beat thread now. + keepRun = false; + void *thrc = NULL; + int rc = pthread_join(heartbeatThread, &thrc); + if (rc != 0) { + LOG(DEBUG1, "LibYarnClient::~LibYarnClient, fail to join heart-beat thread. " + "error code %d", rc); + } else { + LOG(DEBUG1, "LibYarnClient::~LibYarnClient, join heart-beat thread successfully."); + } + } #endif - if (amrmClient != NULL){ - delete (ApplicationMaster*)amrmClient; - } - delete (ApplicationClient*)appClient; - delete (ContainerManagement*)nmClient; + if (amrmClient != NULL) { + delete (ApplicationMaster*) amrmClient; + } + delete (ApplicationClient*) appClient; + delete (ContainerManagement*) nmClient; } string LibYarnClient::getErrorMessage() { return errorMessage; } -void LibYarnClient::setErrorMessage(string errorMsg){ +void LibYarnClient::setErrorMessage(string errorMsg) { errorMessage = errorMsg; } bool LibYarnClient::isJobHealthy() { - return keepRun; + return keepRun; } list<ResourceRequest>& LibYarnClient::getAskRequests() { - return askRequests; + return askRequests; } void LibYarnClient::clearAskRequests() { - LOG(INFO, "LibYarnClient::clear ask requests."); - askRequests.clear(); + LOG(DEBUG1, "LibYarnClient::clear ask requests."); + askRequests.clear(); } void* heartbeatFunc(void* args) { - int failcounter = 0; - int retry = 2; - LibYarnClient *client = (LibYarnClient*)args; - - while (client->keepRun) { - try { - client->dummyAllocate(); - failcounter = 0; - } catch (const ApplicationMasterNotRegisteredException &e) { - /* - * In case catch this exception, - * heartbeat thread should exits, and re-register AM. - */ - LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy allocation " - "catch ApplicationMasterNotRegisteredException. %s", - e.msg()); - client->keepRun = false; - break; - } catch (const YarnException &e) { - LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy allocation " - "is not correctly executed with exception raised. %s", - e.msg()); - failcounter++; - if ( failcounter > retry ) { - /* In case retry too many times with errors/exceptions, this - * thread will return. LibYarn has to re-register application - * and start the heartbeat thread again. - */ - LOG(WARNING, "LibYarnClient::heartbeatFunc, there are too many " - "failures raised. This heart-beat thread exits now."); - client->keepRun = false; - break; - } - } - usleep((client->heartbeatInterval) * 1000); - } - - LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase."); - return (void *)0; + int failcounter = 0; + int retry = 2; + LibYarnClient *client = (LibYarnClient*) args; + + while (client->keepRun) { + try { + client->dummyAllocate(); + failcounter = 0; + } catch (const ApplicationMasterNotRegisteredException &e) { + /* + * In case catch this exception, + * heartbeat thread should exits, and re-register AM. + */ + LOG(WARNING, "LibYarnClient::heartbeat dummy allocation " + "catch ApplicationMasterNotRegisteredException. %s", + e.msg()); + client->keepRun = false; + break; + } catch (const YarnException &e) { + LOG(WARNING, "LibYarnClient::heartbeat dummy allocation " + "is not correctly executed with exception raised. %s", + e.msg()); + failcounter++; + if (failcounter > retry) { + /* In case retry too many times with errors/exceptions, this + * thread will return. LibYarn has to re-register application + * and start the heartbeat thread again. + */ + LOG(WARNING, "LibYarnClient::heartbeatFunc, there are too many " + "failures raised. This heart-beat thread exits now."); + client->keepRun = false; + break; + } + } + usleep((client->heartbeatInterval) * 1000); + } + + LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase."); + return (void *) 0; } -int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) { - try{ - //Only one jobId for the client right now. - if (clientJobId != ""){ - throw std::invalid_argument( "Exist an application for the client"); - } - ApplicationClient *applicationClient = (ApplicationClient*)appClient; +int LibYarnClient::createJob(string &jobName, string &queue, string &jobId) { + try { + //Only one jobId for the client right now. + if (clientJobId != ""){ + throw std::invalid_argument( "Exist an application for the client"); + } + ApplicationClient *applicationClient = (ApplicationClient*) appClient; //1. getNewApplication ApplicationId appId = applicationClient->getNewApplication(); - LOG(INFO, "LibYarnClient::createJob, getNewApplication finished, appId:[clusterTimeStamp:%lld,id:%d]", - appId.getClusterTimestamp(), appId.getId()); + LOG(DEBUG1, "LibYarnClient::createJob, getNewApplication finished, appId:[clusterTimeStamp:%lld,id:%d]", + appId.getClusterTimestamp(), appId.getId()); //2. submitApplication ApplicationSubmissionContext appSubmitCtx; @@ -180,16 +178,17 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) { appSubmitCtx.setMaxAppAttempts(1); applicationClient->submitApplication(appSubmitCtx); - LOG(INFO, "LibYarnClient::createJob, submitApplication finished"); + LOG(DEBUG1, "LibYarnClient::createJob, submitApplication finished"); //3. wait util AM is ACCEPTED and return the AMRMToken ApplicationReport report; int retry = 10; while (retry > 0) { report = applicationClient->getApplicationReport(appId); - LOG(INFO,"LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d], appState:%d", - appId.getClusterTimestamp(), appId.getId(), report.getYarnApplicationState()); - if ((report.getAMRMToken().getPassword() != "") && report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) { + LOG(DEBUG1, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d], appState:%d", + appId.getClusterTimestamp(), appId.getId(), report.getYarnApplicationState()); + if ((report.getAMRMToken().getPassword() != "") && + report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) { break; } else { retry--; @@ -205,20 +204,21 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) { //4.1 new ApplicationMaster Token token = report.getAMRMToken(); UserInfo user; - if (applicationClient->getMethod() == SIMPLE) - user = UserInfo::LocalUser(); - else if (applicationClient->getMethod() == KERBEROS) { - user.setEffectiveUser(applicationClient->getPrincipal()); - user.setRealUser(applicationClient->getUser()); - } else { - LOG(WARNING, "LibYarnClient::createJob: unsupported RPC method:%d. ", applicationClient->getMethod()); - } - - Yarn::Token AMToken; - AMToken.setIdentifier(token.getIdentifier()); - AMToken.setKind(token.getKind()); - AMToken.setPassword(token.getPassword()); - AMToken.setService(token.getService()); + if (applicationClient->getMethod() == SIMPLE) { + user = UserInfo::LocalUser(); + } else if (applicationClient->getMethod() == KERBEROS) { + user.setEffectiveUser(applicationClient->getPrincipal()); + user.setRealUser(applicationClient->getUser()); + } else { + LOG(WARNING, "LibYarnClient::createJob: unsupported RPC method:%d. ", + applicationClient->getMethod()); + } + + Yarn::Token AMToken; + AMToken.setIdentifier(token.getIdentifier()); + AMToken.setKind(token.getKind()); + AMToken.setPassword(token.getPassword()); + AMToken.setService(token.getService()); user.addToken(AMToken); #ifndef MOCKTEST @@ -227,60 +227,60 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) { #endif //4.2 register to RM scheduler as AM - ((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost, amPort,am_tracking_url); - LOG(INFO, "LibYarnClient::createJob, registerApplicationMaster finished"); + ((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost, amPort, am_tracking_url); + LOG(DEBUG1, "LibYarnClient::createJob, registerApplicationMaster finished"); #ifndef MOCKTEST keepRun = true; //5. setup the heartbeat thread to allocate, release, heartbeat int rc = pthread_create(&heartbeatThread, NULL, heartbeatFunc, this); - if ( rc != 0 ) { + if (rc != 0) { keepRun = false; - LOG(INFO, "LibYarnClient::createJob, fail to create heart-beat thread. " - "error code %d", rc); - throw std::runtime_error( "Fail to create heart-beat thread."); + LOG(WARNING, "LibYarnClient::createJob, fail to create heart-beat thread. " + "error code %d", rc); + throw std::runtime_error("Fail to create heart-beat thread."); } needHeartbeatAlive = true; #endif - LOG(INFO,"LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started"); + LOG(DEBUG1, "LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started"); //6. return jobId stringstream ss; ss << "job_" << appId.getClusterTimestamp() << "_" << appId.getId(); jobId = ss.str(); - LOG(INFO,"LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d]", - clientAppId.getClusterTimestamp(), clientAppId.getId()); + LOG(INFO, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d]", + clientAppId.getClusterTimestamp(), clientAppId.getId()); clientJobId = jobId; return FR_SUCCEEDED; } catch (const YarnNetworkConnectException &e) { - stringstream errorMsg; - errorMsg << "LibYarnClient::createJob, catch network connection exception:" << e.what(); - setErrorMessage(errorMsg.str()); - return FR_FAILED; - } catch (const std::exception &e) { + stringstream errorMsg; + errorMsg << "LibYarnClient::createJob, catch network connection exception:" << e.what(); + setErrorMessage(errorMsg.str()); + return FR_FAILED; + } catch (const std::exception &e) { stringstream errorMsg; errorMsg << "LibYarnClient::createJob, catch exception:" << e.what(); setErrorMessage(errorMsg.str()); return FR_FAILED; } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::createJob, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::createJob, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } int LibYarnClient::forceKillJob(string &jobId) { #ifndef MOCKTEST - if ( keepRun ) { + if (keepRun) { keepRun = false; void *thrc = NULL; int rc = pthread_join(heartbeatThread, &thrc); - if ( rc != 0 ) { - LOG(INFO, "LibYarnClient::forceKillJob, fail to join heart-beat thread. " - "error code %d", rc); + if (rc != 0) { + LOG(WARNING, "LibYarnClient::forceKillJob, fail to join heart-beat thread. " + "error code %d", rc); return FR_FAILED; } else { LOG(INFO, "LibYarnClient::forceKillJob, join heart-beat thread successfully."); @@ -294,27 +294,30 @@ int LibYarnClient::forceKillJob(string &jobId) { } needHeartbeatAlive = false; - for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { + for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); + it != jobIdContainers.end(); it++) { ostringstream key; Container *container = it->second; key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort(); Token nmToken = nmTokenCache[key.str()]; - ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken); - LOG(INFO,"LibYarnClient::forceKillJob, container:%ld is stopped",container->getId().getId()); + ((ContainerManagement*) nmClient)->stopContainer((*container), nmToken); + LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld is stopped",container->getId().getId()); } ((ApplicationClient*) appClient)->forceKillApplication(clientAppId); - LOG(INFO, "LibYarnClient::forceKillJob, forceKillApplication"); + LOG(INFO, "LibYarnClient::force to kill this application."); - for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { - LOG(INFO,"LibYarnClient::forceKillJob, container:%ld in jobIdContainers is deleted",it->second->getId().getId()); + for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); + it != jobIdContainers.end(); it++) { + LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld in jobIdContainers is deleted", + it->second->getId().getId()); delete it->second; it->second = NULL; } jobIdContainers.clear(); activeFailContainerIds.clear(); return FR_SUCCEEDED; - } catch(std::exception& e){ + } catch (std::exception& e) { stringstream errorMsg; errorMsg << "LibYarnClient::forceKillJob, catch the exception:" << e.what(); setErrorMessage(errorMsg.str()); @@ -331,82 +334,77 @@ int LibYarnClient::forceKillJob(string &jobId) { void LibYarnClient::dummyAllocate() { pthread_mutex_lock(&heartbeatLock); - ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; - - //1) requestProto_blank - list<ResourceRequest> asksBlank; - //2) releasesBlank - list<ContainerId> releasesBlank; - //3) blacklistRequestBlank - ResourceBlacklistRequest blacklistRequestBlank; - //4) progress - float progress = 0.5; - int allocatedNum = 0; - - try { - LOG(INFO, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat with response_id:%d", response_id); - AllocateResponse response = amrmClientAlias->allocate(asksBlank, - releasesBlank, - blacklistRequestBlank, - response_id, - progress); - response_id = response.getResponseId(); - list<Container> allocatedContainers = response.getAllocatedContainers(); - allocatedNum = allocatedContainers.size(); - LOG(INFO,"LibYarnClient::dummyAllocate returned response_id :%d", response_id); - if (allocatedNum > 0) { - /* - * In rare case, client gets allocated containers in heartbeat, - * free them immediately. - */ - LOG(INFO, "LibYarnClient::dummyAllocate returned allocated size: %d, " - "free them immediately.", allocatedNum); - list<ContainerId> releases; - for (list<Container>::iterator it = allocatedContainers.begin(); - it != allocatedContainers.end(); it++) { - releases.push_back((*it).getId()); - } - list<ResourceRequest> asksBlank; - ResourceBlacklistRequest blacklistRequestBlank; - response = amrmClientAlias->allocate(asksBlank, releases, - blacklistRequestBlank, response_id, progress); - response_id = response.getResponseId(); - } - pthread_mutex_unlock(&heartbeatLock); - } - catch (const YarnException &e) { - LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation " - "is not correctly executed with exception raised. %s", - e.msg()); - pthread_mutex_unlock(&heartbeatLock); - throw; - } + ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; + + //1) requestProto_blank + list<ResourceRequest> asksBlank; + //2) releasesBlank + list<ContainerId> releasesBlank; + //3) blacklistRequestBlank + ResourceBlacklistRequest blacklistRequestBlank; + //4) progress + float progress = 0.5; + int allocatedNum = 0; + + try { + LOG(DEBUG1, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat with response_id:%d", response_id); + AllocateResponse response = amrmClientAlias->allocate(asksBlank, + releasesBlank, blacklistRequestBlank, response_id, progress); + response_id = response.getResponseId(); + list<Container> allocatedContainers = response.getAllocatedContainers(); + allocatedNum = allocatedContainers.size(); + LOG(DEBUG1, "LibYarnClient::dummyAllocate returned response_id :%d", response_id); + if (allocatedNum > 0) { + /* + * In rare case, client gets allocated containers in heartbeat, + * free them immediately. + */ + LOG(DEBUG1, "LibYarnClient::dummyAllocate returned allocated size: %d, " + "free them immediately.", allocatedNum); + list<ContainerId> releases; + for (list<Container>::iterator it = allocatedContainers.begin(); + it != allocatedContainers.end(); it++) { + releases.push_back((*it).getId()); + } + list<ResourceRequest> asksBlank; + ResourceBlacklistRequest blacklistRequestBlank; + response = amrmClientAlias->allocate(asksBlank, releases, + blacklistRequestBlank, response_id, progress); + response_id = response.getResponseId(); + } + pthread_mutex_unlock(&heartbeatLock); + } + catch (const YarnException &e) { + LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation " + "is not correctly executed with exception raised. %s", + e.msg()); + pthread_mutex_unlock(&heartbeatLock); + throw; + } } void LibYarnClient::addResourceRequest(Resource capability, - int32_t num_containers, - string host, - int32_t priority, - bool relax_locality) + int32_t num_containers, string host, int32_t priority, + bool relax_locality) { - ResourceRequest *req = new ResourceRequest(); - req->setCapability(capability); - req->setNumContainers(num_containers); - Priority priorityProto; - priorityProto.setPriority(priority); - req->setPriority(priorityProto); - req->setResourceName(host); - req->setRelaxLocality(relax_locality); - try { - askRequests.push_back(*req); - LOG(INFO, "LibYarnClient::put a request into ask list, " - "mem:%d, cpu:%d, priority:%d, resource name:%s, relax:%d, num_containers:%d", - capability.getMemory(), capability.getVirtualCores(), priority, host.c_str(), - relax_locality, num_containers); - } catch (std::exception &e) { - LOG(WARNING, "LibYarnClient::Fail to add a resource request " - "to ask list. %s ", e.what()); - } + ResourceRequest *req = new ResourceRequest(); + req->setCapability(capability); + req->setNumContainers(num_containers); + Priority priorityProto; + priorityProto.setPriority(priority); + req->setPriority(priorityProto); + req->setResourceName(host); + req->setRelaxLocality(relax_locality); + try { + askRequests.push_back(*req); + LOG(DEBUG1, "LibYarnClient::put a request into ask list, " + "mem:%d, cpu:%d, priority:%d, resource name:%s, relax:%d, num_containers:%d", + capability.getMemory(), capability.getVirtualCores(), priority, host.c_str(), + relax_locality, num_containers); + } catch (std::exception &e) { + LOG(WARNING, "LibYarnClient::Fail to add a resource request " + "to ask list. %s ", e.what()); + } } /* @@ -416,84 +414,85 @@ void LibYarnClient::addResourceRequest(Resource capability, * is supported. * * Parameters: - * jobId: jobId - * capability: the quota of the resource - * count: the required number of the containers - * preferred: node list, NULL means ANY host. If one node's rack name is NULL, a default rack name is set. - * priority: priority + * jobId: jobId + * capability: the quota of the resource + * count: the required number of the containers + * preferred: node list, NULL means ANY host. If one node's rack name is NULL, a default rack name is set. + * priority: priority */ -int LibYarnClient::addContainerRequests(string &jobId, Resource &capability, int32_t num_containers, - list<LibYarnNodeInfo> &preferred, int32_t priority, bool relax_locality) +int LibYarnClient::addContainerRequests(string &jobId, Resource &capability, + int32_t num_containers, list<LibYarnNodeInfo> &preferred, + int32_t priority, bool relax_locality) { - try { - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong, check the jobId argument"); - } - - map<string, int32_t> inferredRacks; - - for (list<LibYarnNodeInfo>::iterator iter = preferred.begin(); - iter != preferred.end(); iter++) { - LOG(INFO, "LibYarnClient::addContainerRequests, " - "get a preferred host info, host:%s,rack:%s,container number:%d", - iter->getHost().c_str(), iter->getRack().c_str(), iter->getContainerNum()); - /* add a resource request for this node */ - addResourceRequest(capability, iter->getContainerNum(), iter->getHost(), priority, true); - map<string, int32_t>:: iterator it = inferredRacks.find(iter->getRack()); - if (it != inferredRacks.end()) - it->second += iter->getContainerNum(); - else - inferredRacks.insert(make_pair(iter->getRack(), iter->getContainerNum())); - } - - /* add resource requests for racks*/ - for (map<string, int32_t>:: iterator it = inferredRacks.begin() ; - it != inferredRacks.end(); it++) - addResourceRequest(capability, it->second, it->first, priority, relax_locality); - - /* add resource request for off-switch */ - addResourceRequest(capability, num_containers, YARN_HOST_ANY, priority, relax_locality); - - return FR_SUCCEEDED; - } catch (std::exception &e) { - stringstream errorMsg; - errorMsg << "LibYarnClient::addContainerRequests catch std exception:" << e.what(); + try { + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong, check the jobId argument"); + } + + map<string, int32_t> inferredRacks; + + for (list<LibYarnNodeInfo>::iterator iter = preferred.begin(); + iter != preferred.end(); iter++) { + LOG(DEBUG1, "LibYarnClient::addContainerRequests, " + "get a preferred host info, host:%s,rack:%s,container number:%d", + iter->getHost().c_str(), iter->getRack().c_str(), iter->getContainerNum()); + /* add a resource request for this node */ + addResourceRequest(capability, iter->getContainerNum(), iter->getHost(), priority, true); + map<string, int32_t>::iterator it = inferredRacks.find(iter->getRack()); + if (it != inferredRacks.end()) + it->second += iter->getContainerNum(); + else + inferredRacks.insert(make_pair(iter->getRack(), iter->getContainerNum())); + } + + /* add resource requests for racks*/ + for (map<string, int32_t>::iterator it = inferredRacks.begin(); + it != inferredRacks.end(); it++) { + addResourceRequest(capability, it->second, it->first, priority, relax_locality); + } + + /* add resource request for off-switch */ + addResourceRequest(capability, num_containers, YARN_HOST_ANY, priority, relax_locality); + + return FR_SUCCEEDED; + } catch (std::exception &e) { + stringstream errorMsg; + errorMsg << "LibYarnClient::addContainerRequests catch std exception:" << e.what(); setErrorMessage(errorMsg.str()); - return FR_FAILED; - } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::addContainerRequests catch unexpected exception."; + return FR_FAILED; + } catch (...) { + stringstream errorMsg; + errorMsg << "LibYarnClient::addContainerRequests catch unexpected exception."; setErrorMessage(errorMsg.str()); - return FR_FAILED; - } + return FR_FAILED; + } } /* -message AllocateRequestProto { -repeated ResourceRequestProto ask = 1; -repeated ContainerIdProto release = 2; -optional ResourceBlacklistRequestProto blacklist_request = 3; -optional int32 response_id = 4; -optional float progress = 5; -} - -message AllocateResponseProto { -optional AMCommandProto a_m_command = 1; -optional int32 response_id = 2; -repeated ContainerProto allocated_containers = 3; -repeated ContainerStatusProto completed_container_statuses = 4; -optional ResourceProto limit = 5; -repeated NodeReportProto updated_nodes = 6; -optional int32 num_cluster_nodes = 7; -optional PreemptionMessageProto preempt = 8; -repeated NMTokenProto nm_tokens = 9; -} -*/ + message AllocateRequestProto { + repeated ResourceRequestProto ask = 1; + repeated ContainerIdProto release = 2; + optional ResourceBlacklistRequestProto blacklist_request = 3; + optional int32 response_id = 4; + optional float progress = 5; + } + + message AllocateResponseProto { + optional AMCommandProto a_m_command = 1; + optional int32 response_id = 2; + repeated ContainerProto allocated_containers = 3; + repeated ContainerStatusProto completed_container_statuses = 4; + optional ResourceProto limit = 5; + repeated NodeReportProto updated_nodes = 6; + optional int32 num_cluster_nodes = 7; + optional PreemptionMessageProto preempt = 8; + repeated NMTokenProto nm_tokens = 9; + } + */ int LibYarnClient::allocateResources(string &jobId, - list<string> &blackListAdditions, - list<string> &blackListRemovals, - list<Container> &allocatedContainers, - int32_t num_containers) { + list<string> &blackListAdditions, list<string> &blackListRemovals, + list<Container> &allocatedContainers, int32_t num_containers) +{ try{ AllocateResponse response; int retry = 5; @@ -509,8 +508,8 @@ int LibYarnClient::allocateResources(string &jobId, throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; - list<Container> allocatedContainerCache; + ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; + list<Container> allocatedContainerCache; list<ContainerReport> preContainerReports; preContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId); @@ -520,15 +519,17 @@ int LibYarnClient::allocateResources(string &jobId, blacklistRequest.setBlacklistRemovals(blackListRemovals); float progress = 0.5; - LOG(INFO,"LibYarnClient::allocate, ask: container number:%d,", num_containers); + LOG(DEBUG1, "LibYarnClient::request to allocate resource, container number:%d", + num_containers); while (retry > 0) { - LOG(INFO,"LibYarnClient::allocate with response id : %d", response_id); - AllocateResponse response = amrmClientAlias->allocate(this->askRequests, releasesBlank, - blacklistRequest, response_id, progress); + LOG(DEBUG1, "LibYarnClient::allocate with response id : %d", response_id); + AllocateResponse response = amrmClientAlias->allocate( + this->askRequests, releasesBlank, blacklistRequest, + response_id, progress); response_id = response.getResponseId(); - LOG(INFO,"LibYarnClient::allocate returned response id : %d", response_id); - list<NMToken> nmTokens = response.getNMTokens(); + LOG(DEBUG1, "LibYarnClient::allocate returned response id : %d", response_id); + list<NMToken> nmTokens = response.getNMTokens(); for (list<NMToken>::iterator it = nmTokens.begin(); it != nmTokens.end(); it++) { std::ostringstream oss; oss << (*it).getNodeId().getHost() << ":" << (*it).getNodeId().getPort(); @@ -538,7 +539,7 @@ int LibYarnClient::allocateResources(string &jobId, list<Container> allocatedContainerOnce = response.getAllocatedContainers(); allocatedNumOnce = allocatedContainerOnce.size(); if (allocatedNumOnce <= 0) { - LOG(WARNING, "LibYarnClient:: fail to allocate from YARN RM, try again"); + LOG(DEBUG1, "LibYarnClient:: fail to allocate from YARN RM, try again"); retry--; if(retry == 0 && allocatedNumTotal == 0) { /* If failed, just return to Resource Broker to handle*/ @@ -549,10 +550,10 @@ int LibYarnClient::allocateResources(string &jobId, } else { allocatedNumTotal += allocatedNumOnce; allocatedContainerCache.insert(allocatedContainerCache.end(), allocatedContainerOnce.begin(), allocatedContainerOnce.end()); - LOG(INFO, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce); + LOG(DEBUG1, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce); if (allocatedNumTotal >= num_containers) { - LOG(INFO, "LibYarnClient:: allocate enough containers from YARN RM, " - "expected:%d, total:%d", num_containers, allocatedNumTotal); + LOG(DEBUG1, "LibYarnClient:: allocate enough containers from YARN RM, " + "expected:%d, total:%d", num_containers, allocatedNumTotal); break; } @@ -560,39 +561,43 @@ int LibYarnClient::allocateResources(string &jobId, usleep(TimeInterval::ALLOCATE_INTERVAL_MS); } - LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated container number:%d", + LOG(INFO,"LibYarnClient::allocate resource, response_id:%d, allocated container number:%d", response_id, allocatedNumTotal); /* a workaround for allocate more container than request */ list<ContainerId> releases; list<ContainerReport> afterContainerReports; afterContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId); - for (list<ContainerReport>::iterator ait=afterContainerReports.begin(); - ait!=afterContainerReports.end(); ait++){ + for (list<ContainerReport>::iterator ait = afterContainerReports.begin(); + ait != afterContainerReports.end(); ait++) { bool foundInPre = false; - for (list<ContainerReport>::iterator pit=preContainerReports.begin();pit!=preContainerReports.end();pit++){ + for (list<ContainerReport>::iterator pit = + preContainerReports.begin(); + pit != preContainerReports.end(); pit++) { if (pit->getId().getId() == ait->getId().getId()) { foundInPre = true; break; } } - if (!foundInPre){ + if (!foundInPre) { bool foundInNewAllocated = false; - for (list<Container>::iterator cit = allocatedContainerCache.begin();cit!=allocatedContainerCache.end();cit++){ - if(cit->getId().getId() == ait->getId().getId()){ + for (list<Container>::iterator cit = + allocatedContainerCache.begin(); + cit != allocatedContainerCache.end(); cit++) { + if (cit->getId().getId() == ait->getId().getId()) { foundInNewAllocated = true; break; } } - if (!foundInNewAllocated){ + if (!foundInNewAllocated) { releases.push_back((*ait).getId()); } } } int totalNeedRelease = allocatedContainerCache.size() - num_containers; - LOG(INFO,"LibYarnClient::allocateResources, ask: finished: total_allocated_containers:%ld, total_need_release:%d", - allocatedContainerCache.size(), totalNeedRelease); + LOG(DEBUG1, "LibYarnClient::allocateResources, total_allocated_containers:%ld, total_need_release:%d", + allocatedContainerCache.size(), totalNeedRelease); if(totalNeedRelease > 0) { for (int i = 0; i < totalNeedRelease; i++) { list<Container>::iterator it = allocatedContainerCache.begin(); @@ -608,15 +613,16 @@ int LibYarnClient::allocateResources(string &jobId, } /* 3. store allocated containers */ - for(list<Container>::iterator it = allocatedContainerCache.begin();it != allocatedContainerCache.end();it++){ + for (list<Container>::iterator it = allocatedContainerCache.begin(); + it != allocatedContainerCache.end(); it++) { Container *container = new Container((*it)); int64_t containerId = container->getId().getId(); jobIdContainers[containerId] = container; } allocatedContainers = allocatedContainerCache; - LOG(INFO,"LibYarnClient::allocateResources, put all allocated containers size:%ld", - allocatedContainerCache.size()); + LOG(DEBUG1, "LibYarnClient::allocateResources, put all allocated containers size:%ld", + allocatedContainerCache.size()); pthread_mutex_unlock(&heartbeatLock); @@ -649,12 +655,13 @@ int LibYarnClient::allocateResources(string &jobId, } } -int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[],int releaseContainerSize) { +int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], int releaseContainerSize) +{ try{ pthread_mutex_lock(&heartbeatLock); - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); @@ -665,9 +672,9 @@ int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], list<ResourceRequest> asksBlank; //2) releases list<ContainerId> releases; - for (int i = 0;i < releaseContainerSize;i++){ + for (int i = 0; i < releaseContainerSize; i++) { int64_t containerId = releaseContainerIds[i]; - map<int64_t,Container*>::iterator it = jobIdContainers.find(containerId); + map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId); if (it != jobIdContainers.end()) { releases.push_back((it->second)->getId()); } @@ -677,83 +684,85 @@ int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], //4) progress float progress = 0.5; - LOG(INFO, "LibYarnClient::releaseResource, release size:%d",releases.size()); - AllocateResponse response = amrmClientAlias->allocate(asksBlank, releases, - blacklistRequestBlank, response_id, progress); + AllocateResponse response = amrmClientAlias->allocate(asksBlank, + releases, blacklistRequestBlank, response_id, progress); response_id = response.getResponseId(); //erase from the map jobIdContainers - for (list<ContainerId>::iterator it = releases.begin();it != releases.end();it++){ - LOG(INFO, "LibYarnClient::releaseResource, released ContainerId:%ld",it->getId()); - map<int64_t,Container*>::iterator cit = jobIdContainers.find(it->getId()); - if (cit != jobIdContainers.end()){ + for (list<ContainerId>::iterator it = releases.begin(); it != releases.end(); it++) { + LOG(DEBUG1, "LibYarnClient::releaseResource, released ContainerId:%ld", + it->getId()); + map<int64_t, Container*>::iterator cit = jobIdContainers.find(it->getId()); + if (cit != jobIdContainers.end()) { delete cit->second; cit->second = NULL; jobIdContainers.erase(it->getId()); } //erase the element if in activeFailContainers set<int64_t>::iterator sit = activeFailContainerIds.find(it->getId()); - if(sit != activeFailContainerIds.end()){ - LOG(INFO, "LibYarnClient::releaseResource, remove %ld from activeFailContainerIds",(*sit)); + if (sit != activeFailContainerIds.end()) { + LOG(INFO, "LibYarnClient::releaseResource, remove %ld from activeFailContainerIds", + (*sit)); activeFailContainerIds.erase(*sit); } } - LOG(INFO, "LibYarnClient::releaseResources, release complete"); + LOG(INFO, "LibYarnClient::release resources, container number:%d", + releases.size()); pthread_mutex_unlock(&heartbeatLock); return FR_SUCCEEDED; - } catch(std::exception &e) { + } catch (std::exception &e) { stringstream errorMsg; errorMsg << "LibYarnClient::releaseResources, catch exception:" << e.what(); setErrorMessage(errorMsg.str()); pthread_mutex_unlock(&heartbeatLock); return FR_FAILED; } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::releaseResources, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - pthread_mutex_unlock(&heartbeatLock); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::releaseResources, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + pthread_mutex_unlock(&heartbeatLock); + return FR_FAILED; } } /* ---------------------- -message StartContainerRequestProto { - optional ContainerLaunchContextProto container_launch_context = 1; - optional hadoop.common.TokenProto container_token = 2; -} - -message StartContainerResponseProto { - repeated StringBytesMapProto services_meta_data = 1; -} ---------------------- -rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); ---------------------- -message StartContainersRequestProto { - repeated StartContainerRequestProto start_container_request = 1; -} - -message StartContainersResponseProto { - repeated StringBytesMapProto services_meta_data = 1; - repeated ContainerIdProto succeeded_requests = 2; - repeated ContainerExceptionMapProto failed_requests = 3; -} -*/ + --------------------- + message StartContainerRequestProto { + optional ContainerLaunchContextProto container_launch_context = 1; + optional hadoop.common.TokenProto container_token = 2; + } + + message StartContainerResponseProto { + repeated StringBytesMapProto services_meta_data = 1; + } + --------------------- + rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); + --------------------- + message StartContainersRequestProto { + repeated StartContainerRequestProto start_container_request = 1; + } + + message StartContainersResponseProto { + repeated StringBytesMapProto services_meta_data = 1; + repeated ContainerIdProto succeeded_requests = 2; + repeated ContainerExceptionMapProto failed_requests = 3; + } + */ int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],int activeContainerSize) { try{ - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - LOG(INFO, "LibYarnClient::activeResources, activeResources started"); + LOG(DEBUG1, "LibYarnClient::activeResources, activeResources started"); - for (int i = 0; i < activeContainerSize; i++){ + for (int i = 0; i < activeContainerSize; i++) { int64_t containerId = activeContainerIds[i]; map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId); if (it != jobIdContainers.end()) { - try{ + try { Container *container = it->second; ostringstream key; key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort(); @@ -768,27 +777,29 @@ int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],in request.setContainerLaunchCtx(ctx); Token cToken = container->getContainerToken(); request.setContainerToken(cToken); - LOG(INFO, "LibYarnClient::activeResources active containerId:%ld", containerId); + LOG(DEBUG1, "LibYarnClient::activeResources active containerId:%ld", containerId); ((ContainerManagement*)nmClient)->startContainer((*container), request, nmToken); - } catch(std::exception& e){ - LOG(INFO, "LibYarnClient::activeResources, activeResources Failed Id:%ld,exception:%s",containerId,e.what()); + } catch (std::exception& e) { + LOG(WARNING, "LibYarnClient::activeResources, activeResources Failed Id:%ld,exception:%s", + containerId,e.what()); activeFailContainerIds.insert(containerId); } } } - //using namespace Yarn::Internal; - LOG(INFO, "LibYarnClient::activeResources, activeResources finished"); + + LOG(INFO, "LibYarnClient::active resources, container number:%d", + activeContainerSize); return FR_SUCCEEDED; - } catch(std::exception& e){ + } catch (std::exception& e) { stringstream errorMsg; errorMsg << "LibYarnClient::activeResources, Catch the Exception:" << e.what(); setErrorMessage(errorMsg.str()); return FR_FAILED; } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::activeResources, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::activeResources, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } int LibYarnClient::getActiveFailContainerIds(set<int64_t> &activeFailIds){ @@ -799,14 +810,14 @@ int LibYarnClient::getActiveFailContainerIds(set<int64_t> &activeFailIds){ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) { #ifndef MOCKTEST - if ( keepRun ) { + if (keepRun) { // No need to run heart-beat thread now. keepRun = false; void *thrc = NULL; int rc = pthread_join(heartbeatThread, &thrc); - if ( rc != 0 ) { - LOG(INFO, "LibYarnClient::finishJob, fail to join heart-beat thread. " - "error code %d", rc); + if (rc != 0) { + LOG(WARNING, "LibYarnClient::finishJob, fail to join heart-beat thread. " + "error code %d", rc); return FR_FAILED; } else { LOG(INFO, "LibYarnClient::finishJob, join heart-beat thread successfully."); @@ -814,30 +825,31 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) } #endif - try{ + try { if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } needHeartbeatAlive = false; //1. we should stop all containers related with this job - for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { + for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { ostringstream key; Container *container = it->second; key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort(); Token nmToken = nmTokenCache[key.str()]; - ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken); - LOG(INFO,"LibYarnClient::finishJob, container:%ld is stopped",container->getId().getId()); + ((ContainerManagement*) nmClient)->stopContainer((*container), nmToken); + LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld is stopped",container->getId().getId()); } - LOG(INFO,"LibYarnClient::finishJob, all containers for jobId:%s are stopped",jobId.c_str()); + LOG(DEBUG1, "LibYarnClient::finishJob, all containers for jobId:%s are stopped",jobId.c_str()); //2. finish AM string diagnostics(""); string tracking_url(""); ((ApplicationMaster*) amrmClient)->finishApplicationMaster(diagnostics, tracking_url, finalStatus); LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s, finalStatus:%d", jobId.c_str(), finalStatus); //free the Container* memory - for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { - LOG(INFO,"LibYarnClient::finishJob, container:%ld in jobIdContainers are delete",it->second->getId().getId()); + for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { + LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld in jobIdContainers is deleted", + it->second->getId().getId()); delete it->second; it->second = NULL; } @@ -847,154 +859,150 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) return FR_SUCCEEDED; } catch (std::exception& e) { stringstream errorMsg; - errorMsg << "LibYarnClient::finishJob, catch the Exception:" << e.what(); setErrorMessage(errorMsg.str()); - return FR_FAILED; } catch (const ApplicationMasterNotRegisteredException &e) { stringstream errorMsg; - errorMsg << "LibYarnClient::finishJob, " "catch ApplicationMasterNotRegisteredException." << e.what(); setErrorMessage(errorMsg.str()); - return FR_FAILED; } catch (...) { stringstream errorMsg; - errorMsg << "LibYarnClient::finishJob, catch unexpected exception."; setErrorMessage(errorMsg.str()); - return FR_FAILED; } } -int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applicationReport){ - try { - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } +int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applicationReport) { + try { + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - LOG(INFO,"LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]", - clientAppId.getClusterTimestamp(), clientAppId.getId()); - applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId); - LOG(INFO,"LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d", - applicationReport.getApplicationId().getClusterTimestamp(), - applicationReport.getApplicationId().getId(), - applicationReport.getCurrentAppAttemptId().getAttemptId()); + LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]", + clientAppId.getClusterTimestamp(), clientAppId.getId()); + applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId); + LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d", + applicationReport.getApplicationId().getClusterTimestamp(), + applicationReport.getApplicationId().getId(), + applicationReport.getCurrentAppAttemptId().getAttemptId()); - return FR_SUCCEEDED; + return FR_SUCCEEDED; } catch (std::exception& e) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getApplicationReport, Catch the Exception:" - << e.what(); - setErrorMessage(errorMsg.str()); - return FR_FAILED; - } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getApplicationReport, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::getApplicationReport, Catch the Exception:" + << e.what(); + setErrorMessage(errorMsg.str()); + return FR_FAILED; + } catch (...) { + stringstream errorMsg; + errorMsg << "LibYarnClient::getApplicationReport, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } -int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &containerReports){ - try { - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } +int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &containerReports) { + try { + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - LOG(INFO,"LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]", - clientAppId.getClusterTimestamp(), clientAppId.getId()); - containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId); - return FR_SUCCEEDED; - } catch (std::exception& e) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getContainerReports, Catch the Exception:" << e.what(); - setErrorMessage(errorMsg.str()); - return FR_FAILED; - } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getContainerReports, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + LOG(DEBUG1, "LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]", + clientAppId.getClusterTimestamp(), clientAppId.getId()); + containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId); + return FR_SUCCEEDED; + } catch (std::exception& e) { + stringstream errorMsg; + errorMsg << "LibYarnClient::getContainerReports, Catch the Exception:" << e.what(); + setErrorMessage(errorMsg.str()); + return FR_FAILED; + } catch (...) { + stringstream errorMsg; + errorMsg << "LibYarnClient::getContainerReports, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } -int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int containerSize, - list<ContainerStatus> &containerStatues){ - try { - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } +int LibYarnClient::getContainerStatuses(string &jobId, int64_t containerIds[], + int containerSize, list<ContainerStatus> &containerStatues) +{ + try { + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - for (int i = 0; i < containerSize; i++) { - int64_t containerId = containerIds[i]; - map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId); - if (it != jobIdContainers.end()) { - try { - Container *container = it->second; - ostringstream key; - key << container->getNodeId().getHost() << ":"<< container->getNodeId().getPort(); - Token nmToken = nmTokenCache[key.str()]; - ContainerStatus containerStatus = ((ContainerManagement*) nmClient)->getContainerStatus((*container), nmToken); - // the response containerId will be 0 if the request containerId is not exist - if (containerStatus.getContainerId().getId() != 0){ - containerStatues.push_back(containerStatus); - } - } catch (std::exception& e) { - LOG(INFO,"LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%ld,exception:%s",containerId, e.what()); - } - } - } - return FR_SUCCEEDED; - } catch (std::exception& e) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getContainerStatuses, Catch the Exception:" << e.what(); - setErrorMessage(errorMsg.str()); - return FR_FAILED; - } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getContainerStatuses, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + for (int i = 0; i < containerSize; i++) { + int64_t containerId = containerIds[i]; + map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId); + if (it != jobIdContainers.end()) { + try { + Container *container = it->second; + ostringstream key; + key << container->getNodeId().getHost() << ":"<< container->getNodeId().getPort(); + Token nmToken = nmTokenCache[key.str()]; + ContainerStatus containerStatus = ((ContainerManagement*) nmClient)->getContainerStatus((*container), nmToken); + // the response containerId will be 0 if the request containerId is not exist + if (containerStatus.getContainerId().getId() != 0){ + containerStatues.push_back(containerStatus); + } + } catch (std::exception& e) { + LOG(INFO, "LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%ld,exception:%s", + containerId, e.what()); + } + } + } + return FR_SUCCEEDED; + } catch (std::exception& e) { + stringstream errorMsg; + errorMsg << "LibYarnClient::getContainerStatuses, Catch the Exception:" << e.what(); + setErrorMessage(errorMsg.str()); + return FR_FAILED; + } catch (...) { + stringstream errorMsg; + errorMsg << "LibYarnClient::getContainerStatuses, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } int LibYarnClient::getQueueInfo(string &queue, bool includeApps, - bool includeChildQueues, bool recursive,QueueInfo &queueInfo) { - try{ + bool includeChildQueues, bool recursive, QueueInfo &queueInfo) +{ + try { if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue, includeApps, - includeChildQueues, recursive); + queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue, + includeApps, includeChildQueues, recursive); return FR_SUCCEEDED; - } - catch(std::exception& e){ + } catch (std::exception& e) { stringstream errorMsg; errorMsg << "LibYarnClient::getQueueInfo, Catch the Exception:" << e.what(); setErrorMessage(errorMsg.str()); return FR_FAILED; } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getQueueInfo, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::getQueueInfo, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } @@ -1003,19 +1011,18 @@ int LibYarnClient::getClusterNodes(list<NodeState> &states,list<NodeReport> &nod if (!keepRun && needHeartbeatAlive) { throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); } - nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states); + nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states); return FR_SUCCEEDED; - } - catch(std::exception& e){ + } catch (std::exception& e) { stringstream errorMsg; errorMsg << "LibYarnClient::getClusterNodes, Catch the Exception:" << e.what(); setErrorMessage(errorMsg.str()); return FR_FAILED; } catch (...) { - stringstream errorMsg; - errorMsg << "LibYarnClient::getClusterNodes, catch unexpected exception."; - setErrorMessage(errorMsg.str()); - return FR_FAILED; + stringstream errorMsg; + errorMsg << "LibYarnClient::getClusterNodes, catch unexpected exception."; + setErrorMessage(errorMsg.str()); + return FR_FAILED; } } }