Hi All,
We are experiencing some weird behaviour with our interactive query service
implementation.
This is the flow we’ve implemented:
1. kafkaStreams.queryMetadataForKey(store, key, serializer) returns for
activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the
store and partition where the key would reside. We are not interested in
standby hosts. Luckily, we have an active host which we can call.
2. We make an HTTP call to host localhost:8562, asking for the key there.
3. Inside the 8562 host, we retrieve the store by calling
kafkaStreams.store(parameters), using parameters with staleStores set to false.
4. We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the
RUNNING state.
5. Now we call store.get(key) in order to retrieve the key from the store,
if it has been stored there.
6. The get method on our store implementation calls the
storeProvider.stores(storeName, storeType) method to iterate over all the
stores available on the host.
7. The storeProvider is a WrappingStoreProvider, which calls
storeProvider.stores(storeQueryParameters) for each
StreamThreadStateStoreProvider it wraps (just one in our case).
8. As the logic inside that stores method finds that the StreamThread is in
the RUNNING state, it retrieves the tasks based on
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() :
streamThread.activeTasks(), which evaluates to false since we set staleStores
to false in the params.
9. To our surprise, the streamThread.activeTasks() method returns an empty
ArrayList, while the streamThread.allTasks().values() returns one StandbyTask
for the store we’re looking for.
10. As there appear to be no active tasks on this host for this store, we
return the fabled “The state store, " + storeName + ", may have migrated to
another instance.” InvalidStateStoreException.
This flow is quite tricky as the queryMetadataForKey returns an active host,
which turns out to only have a standby task once queried.
I have executed the queryMetadataForKey method on the active host as well, once
before calling kafkaStreams.store in step 3, and another time between step 4
and 5. Each time the metadata returns the same, the host we’re on at that
moment is the active host.
Could it be there is a difference between activeHost and activeTask?
For those also on the confluent community slack might recognize this message as
it has been posted there by our CTO as well.
Cheers,
D.