yashmayya commented on code in PR #13453: URL: https://github.com/apache/kafka/pull/13453#discussion_r1149108767
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java: ########## @@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) { synchronized (this) { log.trace("Received task {} status update {}", id, status); CacheEntry<TaskStatus> entry = getOrAdd(id); - entry.put(status); + if (entry.canWriteSafely(status)) { + entry.put(status); + } else { + log.trace("Ignoring stale status update {} for task {}", status, id); + } Review Comment: We might need a similar fix for connector statuses as well? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java: ########## @@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) { synchronized (this) { log.trace("Received task {} status update {}", id, status); CacheEntry<TaskStatus> entry = getOrAdd(id); - entry.put(status); + if (entry.canWriteSafely(status)) { Review Comment: `AbstractHerder::onShutdown` which is responsible for updating the status to `Unassigned` for a task after it has been shutdown on a worker uses `StatusBackingStore::putSafe`. This "safe" send implementation also does the same check (whether the "newer" status update is from an older generation of tasks) on pre-existing cache entries before writing to the underlying Kafka based log. However, it looks like the cache is only updated on reading back from the log (on the `KafkaBasedLog`'s work thread) and it isn't updated when doing the write itself. So is the cause for this bug that you're trying to fix essentially a race condition where the `KafkaStatusBackingStore` isn't able to read its own writes (at least immediately anyway)? If so, I wonder whether we could consider altering the design of the status backing store to update its cache on writes as well? Although I guess we could run into issues here as well since we're doing async writes so maybe it's overall better to check gene rations on reads through the log and ensure that the actual reflected status always corresponds to that of the latest generation. What do you think? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java: ########## @@ -308,6 +308,37 @@ public void readTaskState() { assertEquals(status, store.get(TASK)); } + @Test + public void readTaskStateShouldIgnoreStaleStatuses() { + byte[] value = new byte[0]; + String otherWorkerId = "anotherhost:8083"; + + // This worker sends a RUNNING status in the most recent generation + Map<String, Object> firstStatusRead = new HashMap<>(); + firstStatusRead.put("worker_id", otherWorkerId); + firstStatusRead.put("state", "RUNNING"); + firstStatusRead.put("generation", 10L); + + // Another worker still ends up producing an UNASSIGNED status before it could + // read the newer RUNNING status from above belonging to an older generation. + Map<String, Object> secondStatusRead = new HashMap<>(); + secondStatusRead.put("worker_id", WORKER_ID); + secondStatusRead.put("state", "UNASSIGNED"); + secondStatusRead.put("generation", 9L); + + when(converter.toConnectData(STATUS_TOPIC, value)) + .thenReturn(new SchemaAndValue(null, firstStatusRead)) + .thenReturn(new SchemaAndValue(null, secondStatusRead)); + + store.read(consumerRecord(0, "status-task-conn-0", value)); + store.read(consumerRecord(0, "status-task-conn-0", value)); + + verify(kafkaBasedLog, never()).send(anyString(), any(), any(Callback.class)); Review Comment: I'm not sure what the point of this verification is since we're only making calls to the store's read methods and this verification would always be true with or without your other changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org