mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r262814871
########## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ########## @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link CoordinationSessionListener} for local {@link CoordinationUtils} + */ + private final class LocalCoordinationSessionListener implements CoordinationSessionListener { + + /** + * If the coordination utils session has reconnected, check if global runid differs from local runid + * if it differs then shut down processor and throw exception + * else recreate ephemeral node corresponding to this processor inside the read write lock for runid + */ + @Override + public void handleReconnect() { + LOG.info("Reconnected to coordination utils"); + if(coordinationUtils == null) { + return; + } + DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess(); + String globalRunId = (String) runIdAccess.readData(RUNID_PATH); + if( runId != globalRunId){ + processors.forEach(StreamProcessor::stop); + cleanup(); + appStatus = ApplicationStatus.UnsuccessfulFinish; + String msg = String.format("run.id %s on processor %s differs from the global run.id %s", runId, uid, globalRunId); + throw new SamzaException(msg); + } else if(runIdLock != null) { + String msg = String.format("Processor {} failed to get the lock for run.id", uid); + try { + // acquire lock to recreate active processor ephemeral node + DistributedReadWriteLock.AccessType lockAccess = runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT); + if(lockAccess == DistributedReadWriteLock.AccessType.WRITE || lockAccess == DistributedReadWriteLock.AccessType.READ) { Review comment: We would need to differentiate between session expiration vs reconnect scenario. In the former, it is guaranteed that the node is cleaned up in the majority of the quorum, vs the latter where the node still exists and you don't need to do anything since the session reconnect reuses the old session id. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services