mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r262813679
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##########
 @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() {
       }
     }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link CoordinationSessionListener} 
for local {@link CoordinationUtils}
+   */
+  private final class LocalCoordinationSessionListener implements 
CoordinationSessionListener {
+
+    /**
+     * If the coordination utils session has reconnected, check if global 
runid differs from local runid
+     * if it differs then shut down processor and throw exception
+     * else recreate ephemeral node corresponding to this processor inside the 
read write lock for runid
+     */
+    @Override
+    public void handleReconnect() {
+      LOG.info("Reconnected to coordination utils");
+      if(coordinationUtils == null) {
+        return;
+      }
+      DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess();
+      String globalRunId = (String) runIdAccess.readData(RUNID_PATH);
+      if( runId != globalRunId){
+        processors.forEach(StreamProcessor::stop);
+        cleanup();
+        appStatus = ApplicationStatus.UnsuccessfulFinish;
+        String msg = String.format("run.id %s on processor %s differs from the 
global run.id %s", runId, uid, globalRunId);
+        throw new SamzaException(msg);
 
 Review comment:
   Can we populate the failure cause instead of throwing an exception? This 
exception will be thrown in the zk client thread and that wouldn't impact the 
main thread. It should be sufficient to stop the processors and update the 
status with unsuccessful finish and log the exception too. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to