mynameborat commented on a change in pull request #938: SAMZA-1531: Support
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r262814180
##########
File path:
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
##########
@@ -280,4 +368,47 @@ private void setApplicationFinalStatus() {
}
}
}
+
+
+ /**
+ * Defines a specific implementation of {@link CoordinationSessionListener}
for local {@link CoordinationUtils}
+ */
+ private final class LocalCoordinationSessionListener implements
CoordinationSessionListener {
+
+ /**
+ * If the coordination utils session has reconnected, check if global
runid differs from local runid
+ * if it differs then shut down processor and throw exception
+ * else recreate ephemeral node corresponding to this processor inside the
read write lock for runid
+ */
+ @Override
+ public void handleReconnect() {
+ LOG.info("Reconnected to coordination utils");
+ if(coordinationUtils == null) {
+ return;
+ }
+ DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess();
+ String globalRunId = (String) runIdAccess.readData(RUNID_PATH);
+ if( runId != globalRunId){
+ processors.forEach(StreamProcessor::stop);
+ cleanup();
+ appStatus = ApplicationStatus.UnsuccessfulFinish;
+ String msg = String.format("run.id %s on processor %s differs from the
global run.id %s", runId, uid, globalRunId);
+ throw new SamzaException(msg);
+ } else if(runIdLock != null) {
+ String msg = String.format("Processor {} failed to get the lock for
run.id", uid);
+ try {
+ // acquire lock to recreate active processor ephemeral node
+ DistributedReadWriteLock.AccessType lockAccess =
runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT);
+ if(lockAccess == DistributedReadWriteLock.AccessType.WRITE ||
lockAccess == DistributedReadWriteLock.AccessType.READ) {
+ LOG.info("Processor {} creates its active processor ephemeral node
in read write lock again" + uid);
+ runIdLock.unlock();
+ } else {
+ throw new SamzaException(msg);
+ }
+ } catch (TimeoutException e) {
+ throw new SamzaException(msg, e);
Review comment:
Refer to my previous comment about throwing the exception. If the intent is
to stop the LAR, we need update the appStatus with failure cause.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services