mynameborat commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1010012333


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -76,7 +76,7 @@ List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
   private final TaskCallbacks completedCallbacks = new TaskCallbacks();
   private final ScheduledExecutorService timer;
   private final TaskCallbackListener listener;
-  private final long timeout;
+  private long timeout;

Review Comment:
   In general, its anti pattern to override instance variables post 
construction. We should probably find another way to enforce timeout for drain.



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
   // timeout for triggering a callback
   public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
   static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  // timeout for triggering a callback during drain
+  public static final String DRAIN_CALLBACK_TIMEOUT_MS = 
"task.callback.drain.timeout.ms";
+
+  // default timeout for triggering a callback during drain
+  static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = 
Duration.ofMinutes(30).toMillis();

Review Comment:
   The default seems too high for applications to get notified in case of 
faults. What is the rationale behind choosing such a high value? Is there a 
systematic way to compute the value here? 
   
   Like what are the downstream dependencies of this function and what SLAs to 
account for when computing the value for this. Document all of these as part of 
the configuration documentation as well.



##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -127,4 +127,14 @@ public List<TaskCallbackImpl> 
updateCallback(TaskCallbackImpl callback) {
       return ImmutableList.of(callback);
     }
   }
+
+  /**
+   * Override the timeout set in the callback manager with the given new 
timeout.
+   * This is intended to be used with pipeline drain as we want to override 
the existing timeout with a higher timeout.
+   *
+   * @param timeout new timeout for process callbacks
+   * */
+  public void updateTaskCallbackTimeout(long timeout) {

Review Comment:
   Why are we overriding the existing timeout set as part of the construction 
of the class? This is confusing and breaks the timeout originally configured 
that gets passed to the callback manager (which is `task.callback.timeout.ms`).
   
   With this change, we are changing user behavior with setting the default 
message processing timeout latency to 30 minutes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to