yashmayya commented on code in PR #13453:
URL: https://github.com/apache/kafka/pull/13453#discussion_r1149108767
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##########
@@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) {
synchronized (this) {
log.trace("Received task {} status update {}", id, status);
CacheEntry<TaskStatus> entry = getOrAdd(id);
- entry.put(status);
+ if (entry.canWriteSafely(status)) {
+ entry.put(status);
+ } else {
+ log.trace("Ignoring stale status update {} for task {}",
status, id);
+ }
Review Comment:
We might need a similar fix for connector statuses as well?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java:
##########
@@ -596,7 +596,11 @@ private void readTaskStatus(String key, byte[] value) {
synchronized (this) {
log.trace("Received task {} status update {}", id, status);
CacheEntry<TaskStatus> entry = getOrAdd(id);
- entry.put(status);
+ if (entry.canWriteSafely(status)) {
Review Comment:
`AbstractHerder::onShutdown` which is responsible for updating the status to
`Unassigned` for a task after it has been shutdown on a worker uses
`StatusBackingStore::putSafe`. This "safe" send implementation also does the
same check (whether the "newer" status update is from an older generation of
tasks) on pre-existing cache entries before writing to the underlying Kafka
based log. However, it looks like the cache is only updated on reading back
from the log (on the `KafkaBasedLog`'s work thread) and it isn't updated when
doing the write itself. So is the cause for this bug that you're trying to fix
essentially a race condition where the `KafkaStatusBackingStore` isn't able to
read its own writes (at least immediately anyway)? If so, I wonder whether we
could consider altering the design of the status backing store to update its
cache on writes as well? Although I guess we could run into issues here as well
since we're doing async writes so maybe it's overall better to check gene
rations on reads through the log and ensure that the actual reflected status
always corresponds to that of the latest generation. What do you think?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java:
##########
@@ -308,6 +308,37 @@ public void readTaskState() {
assertEquals(status, store.get(TASK));
}
+ @Test
+ public void readTaskStateShouldIgnoreStaleStatuses() {
+ byte[] value = new byte[0];
+ String otherWorkerId = "anotherhost:8083";
+
+ // This worker sends a RUNNING status in the most recent generation
+ Map<String, Object> firstStatusRead = new HashMap<>();
+ firstStatusRead.put("worker_id", otherWorkerId);
+ firstStatusRead.put("state", "RUNNING");
+ firstStatusRead.put("generation", 10L);
+
+ // Another worker still ends up producing an UNASSIGNED status before
it could
+ // read the newer RUNNING status from above belonging to an older
generation.
+ Map<String, Object> secondStatusRead = new HashMap<>();
+ secondStatusRead.put("worker_id", WORKER_ID);
+ secondStatusRead.put("state", "UNASSIGNED");
+ secondStatusRead.put("generation", 9L);
+
+ when(converter.toConnectData(STATUS_TOPIC, value))
+ .thenReturn(new SchemaAndValue(null, firstStatusRead))
+ .thenReturn(new SchemaAndValue(null, secondStatusRead));
+
+ store.read(consumerRecord(0, "status-task-conn-0", value));
+ store.read(consumerRecord(0, "status-task-conn-0", value));
+
+ verify(kafkaBasedLog, never()).send(anyString(), any(),
any(Callback.class));
Review Comment:
I'm not sure what the point of this verification is since we're only making
calls to the store's read methods and this verification would always be true
with or without your other changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]