ajothomas commented on code in PR #1708:
URL: https://github.com/apache/samza/pull/1708#discussion_r1811544638


##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -312,10 +317,22 @@ class TaskInstance(
 
     val commitStartNs = System.nanoTime()
     // first check if there were any unrecoverable errors during the async 
stage of the pending commit
-    // and if so, shut down the container.
+    // If there is unrecoverable error, increment the metric and the counter.
+    // Shutdown the container in the following scenarios:
+    // 1. skipCommitDuringFailureEnabled is not enabled
+    // 2. skipCommitDuringFailureEnabled is enabled but the number of 
exceptions exceeded the max count
+    // Otherwise, ignore the exception.
     if (commitException.get() != null) {
-      throw new SamzaException("Unrecoverable error during pending commit for 
taskName: %s." format taskName,
-        commitException.get())
+      metrics.commitExceptions.inc()
+      commitExceptionCounter += 1
+      if (!skipCommitDuringFailureEnabled || commitExceptionCounter > 
skipCommitExceptionMaxLimit) {
+        throw new SamzaException("Unrecoverable error during pending commit 
for taskName: %s. Exception Counter: %s"
+          format (taskName, commitExceptionCounter), commitException.get())
+      } else {
+        warn("Ignored the commit failure for taskName %s: %s" format 
(taskName, commitException.get().getMessage))

Review Comment:
   Lets also put the count in the log.



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -312,10 +317,22 @@ class TaskInstance(
 
     val commitStartNs = System.nanoTime()
     // first check if there were any unrecoverable errors during the async 
stage of the pending commit
-    // and if so, shut down the container.
+    // If there is unrecoverable error, increment the metric and the counter.
+    // Shutdown the container in the following scenarios:
+    // 1. skipCommitDuringFailureEnabled is not enabled
+    // 2. skipCommitDuringFailureEnabled is enabled but the number of 
exceptions exceeded the max count
+    // Otherwise, ignore the exception.
     if (commitException.get() != null) {
-      throw new SamzaException("Unrecoverable error during pending commit for 
taskName: %s." format taskName,
-        commitException.get())
+      metrics.commitExceptions.inc()
+      commitExceptionCounter += 1

Review Comment:
   We should be incrementing on successive commit failures right? what is a 
commit succeeds, shouldn't we reset it to 0. Otherwise anything the 5th commit 
failure is hit during the lifetime of the task in the container, the container 
would fail.



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -312,10 +317,22 @@ class TaskInstance(
 
     val commitStartNs = System.nanoTime()
     // first check if there were any unrecoverable errors during the async 
stage of the pending commit
-    // and if so, shut down the container.
+    // If there is unrecoverable error, increment the metric and the counter.
+    // Shutdown the container in the following scenarios:
+    // 1. skipCommitDuringFailureEnabled is not enabled
+    // 2. skipCommitDuringFailureEnabled is enabled but the number of 
exceptions exceeded the max count
+    // Otherwise, ignore the exception.
     if (commitException.get() != null) {
-      throw new SamzaException("Unrecoverable error during pending commit for 
taskName: %s." format taskName,
-        commitException.get())
+      metrics.commitExceptions.inc()
+      commitExceptionCounter += 1
+      if (!skipCommitDuringFailureEnabled || commitExceptionCounter > 
skipCommitExceptionMaxLimit) {
+        throw new SamzaException("Unrecoverable error during pending commit 
for taskName: %s. Exception Counter: %s"
+          format (taskName, commitExceptionCounter), commitException.get())
+      } else {
+        warn("Ignored the commit failure for taskName %s: %s" format 
(taskName, commitException.get().getMessage))
+        commitException.set(null)
+        commitInProgress.release()

Review Comment:
   What are we releasing the semaphore here? I don't think this is necessary. 
   It will always be released as a part of `finally` clause in the 
`handleCompletion` private method.



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -65,6 +65,15 @@ public class TaskConfig extends MapConfig {
   public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms";
   static final long DEFAULT_COMMIT_TIMEOUT_MS = 
Duration.ofMinutes(30).toMillis();
 
+  public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = 
"task.commit.skip.commit.during.failures.enabled";
+  private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = 
false;
+
+  public static final String SKIP_COMMIT_EXCEPTION_MAX_LIMIT = 
"task.commit.skip.commit.exception.max.limit";

Review Comment:
   could you write comments in code on what each config means?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to