Hi All,

We are experiencing some weird behaviour with our interactive query service 
implementation.
This is the flow we’ve implemented:


  1.  kafkaStreams.queryMetadataForKey(store, key, serializer) returns for 
activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the 
store and partition where the key would reside. We are not interested in 
standby hosts. Luckily, we have an active host which we can call.
  2.  We make an HTTP call to host localhost:8562, asking for the key there.
  3.  Inside the 8562 host, we retrieve the store by calling 
kafkaStreams.store(parameters), using parameters with staleStores set to false.
  4.  We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the 
RUNNING state.
  5.  Now we call store.get(key) in order to retrieve the key from the store, 
if it has been stored there.
  6.  The get method on our store implementation calls the 
storeProvider.stores(storeName, storeType) method to iterate over all the 
stores available on the host.
  7.  The storeProvider is a WrappingStoreProvider, which calls 
storeProvider.stores(storeQueryParameters) for each 
StreamThreadStateStoreProvider it wraps (just one in our case).
  8.  As the logic inside that stores method finds that the StreamThread is in 
the RUNNING state, it retrieves the tasks based on 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : 
streamThread.activeTasks(), which evaluates to false since we set staleStores 
to false in the params.
  9.  To our surprise, the streamThread.activeTasks() method returns an empty 
ArrayList, while the streamThread.allTasks().values() returns one StandbyTask 
for the store we’re looking for.
  10. As there appear to be no active tasks on this host for this store, we 
return the fabled “The state store, " + storeName + ", may have migrated to 
another instance.” InvalidStateStoreException.

This flow is quite tricky as the queryMetadataForKey returns an active host, 
which turns out to only have a standby task once queried.
I have executed the queryMetadataForKey method on the active host as well, once 
before calling kafkaStreams.store in step 3, and another time between step 4 
and 5. Each time the metadata returns the same, the host we’re on at that 
moment is the active host.

Could it be there is a difference between activeHost and activeTask?

For those also on the confluent community slack might recognize this message as 
it has been posted there by our CTO as well.

Cheers,
D.

Reply via email to