Hi All, We are experiencing some weird behaviour with our interactive query service implementation. This is the flow we’ve implemented:
1. kafkaStreams.queryMetadataForKey(store, key, serializer) returns for activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the store and partition where the key would reside. We are not interested in standby hosts. Luckily, we have an active host which we can call. 2. We make an HTTP call to host localhost:8562, asking for the key there. 3. Inside the 8562 host, we retrieve the store by calling kafkaStreams.store(parameters), using parameters with staleStores set to false. 4. We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the RUNNING state. 5. Now we call store.get(key) in order to retrieve the key from the store, if it has been stored there. 6. The get method on our store implementation calls the storeProvider.stores(storeName, storeType) method to iterate over all the stores available on the host. 7. The storeProvider is a WrappingStoreProvider, which calls storeProvider.stores(storeQueryParameters) for each StreamThreadStateStoreProvider it wraps (just one in our case). 8. As the logic inside that stores method finds that the StreamThread is in the RUNNING state, it retrieves the tasks based on storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : streamThread.activeTasks(), which evaluates to false since we set staleStores to false in the params. 9. To our surprise, the streamThread.activeTasks() method returns an empty ArrayList, while the streamThread.allTasks().values() returns one StandbyTask for the store we’re looking for. 10. As there appear to be no active tasks on this host for this store, we return the fabled “The state store, " + storeName + ", may have migrated to another instance.” InvalidStateStoreException. This flow is quite tricky as the queryMetadataForKey returns an active host, which turns out to only have a standby task once queried. I have executed the queryMetadataForKey method on the active host as well, once before calling kafkaStreams.store in step 3, and another time between step 4 and 5. Each time the metadata returns the same, the host we’re on at that moment is the active host. Could it be there is a difference between activeHost and activeTask? For those also on the confluent community slack might recognize this message as it has been posted there by our CTO as well. Cheers, D.