yashmayya commented on code in PR #13453:
URL: https://github.com/apache/kafka/pull/13453#discussion_r1149123939


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##########
@@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) {
         synchronized (this) {
             log.trace("Received task {} status update {}", id, status);
             CacheEntry<TaskStatus> entry = getOrAdd(id);
-            entry.put(status);
+            if (entry.canWriteSafely(status)) {

Review Comment:
   `AbstractHerder::onShutdown` which is responsible for updating the status to 
`Unassigned` for a task after it has been shutdown on a worker uses 
`StatusBackingStore::putSafe`. This "safe" send implementation also does the 
same check (whether the "newer" status update is from an older generation of 
tasks) on pre-existing cache entries before writing to the underlying Kafka 
based log. However, it looks like the cache is only updated on reading back 
from the log (on the `KafkaBasedLog`'s work thread) and it isn't updated when 
doing the write itself. So is the cause for this bug that you're trying to fix 
essentially a race condition where the `KafkaStatusBackingStore` isn't able to 
read its own writes (at least immediately anyway)? If so, I wonder whether we 
could consider altering the design of the status backing store to update its 
cache on writes as well? Although I guess we could run into issues here as well 
since we're doing async writes so maybe it's overall better to check gene
 rations on reads through the log and ensure that the actual reflected status 
always corresponds to that of the latest generation. What do you think?
   
   Edit: Never mind, I just realized that the status updates will be from 
different workers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to