This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f8bfe87 SAMZA-2464: Container shuts down when task fails to remove
old state checkpoint dirs (#1283)
f8bfe87 is described below
commit f8bfe875e9866d350857cf99e5c2b0ddaf7d8ac1
Author: bkonold <[email protected]>
AuthorDate: Tue Mar 3 14:22:29 2020 -0800
SAMZA-2464: Container shuts down when task fails to remove old state
checkpoint dirs (#1283)
---
.../src/main/scala/org/apache/samza/container/TaskInstance.scala | 6 +++++-
.../test/scala/org/apache/samza/container/TestTaskInstance.scala | 6 ++----
2 files changed, 7 insertions(+), 5 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2a4f1d6..37aaeff 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -274,7 +274,11 @@ class TaskInstance(
if (storageManager != null) {
trace("Remove old checkpoint stores for taskName: %s" format taskName)
- storageManager.removeOldCheckpoints(checkpointId)
+ try {
+ storageManager.removeOldCheckpoints(checkpointId)
+ } catch {
+ case e: Exception => error("Failed to remove old checkpoints for task:
%s. Current checkpointId: %s" format (taskName, checkpointId), e)
+ }
}
if (inputCheckpoint != null) {
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index a54ae72..90f1b58 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -336,7 +336,7 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
}
@Test
- def testCommitFailsIfErrorClearingOldCheckpoints() { // required for
transactional state
+ def testCommitContinuesIfErrorClearingOldCheckpoints() { // required for
transactional state
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
@@ -352,10 +352,8 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not get
changelog offsets.
- return
+ fail("Exception from removeOldCheckpoints should have been caught")
}
-
- fail("Should have failed commit if error getting newest changelog offests")
}
/**