yashmayya commented on code in PR #13453:
URL: https://github.com/apache/kafka/pull/13453#discussion_r1149108767


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##########
@@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) {
         synchronized (this) {
             log.trace("Received task {} status update {}", id, status);
             CacheEntry<TaskStatus> entry = getOrAdd(id);
-            entry.put(status);
+            if (entry.canWriteSafely(status)) {
+                entry.put(status);
+            } else {
+                log.trace("Ignoring stale status update {} for task {}", 
status, id);
+            }

Review Comment:
   We might need a similar fix for connector statuses as well?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##########
@@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) {
         synchronized (this) {
             log.trace("Received task {} status update {}", id, status);
             CacheEntry<TaskStatus> entry = getOrAdd(id);
-            entry.put(status);
+            if (entry.canWriteSafely(status)) {

Review Comment:
   `AbstractHerder::onShutdown` which is responsible for updating the status to 
`Unassigned` for a task after it has been shutdown on a worker uses 
`StatusBackingStore::putSafe`. This "safe" send implementation also does the 
same check (whether the "newer" status update is from an older generation of 
tasks) on pre-existing cache entries before writing to the underlying Kafka 
based log. However, it looks like the cache is only updated on reading back 
from the log (on the `KafkaBasedLog`'s work thread) and it isn't updated when 
doing the write itself. So is the cause for this bug that you're trying to fix 
essentially a race condition where the `KafkaStatusBackingStore` isn't able to 
read its own writes (at least immediately anyway)? If so, I wonder whether we 
could consider altering the design of the status backing store to update its 
cache on writes as well? Although I guess we could run into issues here as well 
since we're doing async writes so maybe it's overall better to check gene
 rations on reads through the log and ensure that the actual reflected status 
always corresponds to that of the latest generation. What do you think?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java:
##########
@@ -308,6 +308,37 @@ public void readTaskState() {
         assertEquals(status, store.get(TASK));
     }
 
+    @Test
+    public void readTaskStateShouldIgnoreStaleStatuses() {
+        byte[] value = new byte[0];
+        String otherWorkerId = "anotherhost:8083";
+
+        // This worker sends a RUNNING status in the most recent generation
+        Map<String, Object> firstStatusRead = new HashMap<>();
+        firstStatusRead.put("worker_id", otherWorkerId);
+        firstStatusRead.put("state", "RUNNING");
+        firstStatusRead.put("generation", 10L);
+
+        // Another worker still ends up producing an UNASSIGNED status before 
it could
+        // read the newer RUNNING status from above belonging to an older 
generation.
+        Map<String, Object> secondStatusRead = new HashMap<>();
+        secondStatusRead.put("worker_id", WORKER_ID);
+        secondStatusRead.put("state", "UNASSIGNED");
+        secondStatusRead.put("generation", 9L);
+
+        when(converter.toConnectData(STATUS_TOPIC, value))
+                .thenReturn(new SchemaAndValue(null, firstStatusRead))
+                .thenReturn(new SchemaAndValue(null, secondStatusRead));
+
+        store.read(consumerRecord(0, "status-task-conn-0", value));
+        store.read(consumerRecord(0, "status-task-conn-0", value));
+
+        verify(kafkaBasedLog, never()).send(anyString(), any(), 
any(Callback.class));

Review Comment:
   I'm not sure what the point of this verification is since we're only making 
calls to the store's read methods and this verification would always be true 
with or without your other changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to