cameronlee314 commented on a change in pull request #1213: SAMZA-2305: Stream 
processor should ensure previous container is stopped during a rebalance
URL: https://github.com/apache/samza/pull/1213#discussion_r356299733
 
 

 ##########
 File path: 
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 ##########
 @@ -200,6 +200,11 @@ class KeyValueStorageEngine[K, V](
 
     // flush the store and the changelog producer
     flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog 
producers. This only flushes the stores.
+
+    if (Thread.currentThread().isInterrupted) {
+      warn("Received an interrupt during store restoration. Exiting without 
restoring the full state.")
+      throw new InterruptedException("Received an interrupt during store 
restoration.")
+    }
 
 Review comment:
   I was wondering if you depended on the interrupt being checked in this block 
because you wanted to ensure that `flush` did get called.
   An example situation I was thinking of was that RocksDB does do interrupt 
handling in `doPutAll` (maybe not today, but it's still possible in the 
future), and that could maybe cause `flush` to get skipped.
   I just wanted to make sure all the interrupt situations are handled. If you 
can confirm that this will handle interrupt situations ok, then I am good with 
this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to