On Thu, Jul 22, 2021 at 4:47 AM Steve Niemitz <[email protected]> wrote:
> I don't think I'd call it a bug? The cache doesn't differentiate between > a state cell that existed but was cleared, and one that is missing from the > cache (maybe it should?). > Filing this in my collection of problems caused by "nullable" vs "Optionable". Conflating "not in cache" and "empty value" is definitely a bug. Luckily in this case only a performance bug. Kenn > The side input fetcher clears the blocked state when it becomes > unblocked: > > > https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java#L241 > > Given that interaction, once the side input becomes ready, every request > for the blocked map will result in a cache miss, and a state lookup > request. Changing the cache to cache tombstones for cleared cells is > another possibility as well. > > On Thu, Jul 22, 2021 at 2:33 AM Reuven Lax <[email protected]> wrote: > >> So you're saying there's a bug in the caching logic that prevents the >> side-input cache from working? >> >> On Wed, Jul 21, 2021 at 7:07 PM Steve Niemitz <[email protected]> >> wrote: >> >>> I had opened a jira years ago [1] about this, but would like to actually >>> fix it for real now, given that our users have started using streaming more >>> and more. >>> >>> There's more detail in the jira, but basically side inputs in streaming >>> pipelines on dataflow lead to pretty bad performance because they result in >>> a state lookup request for each element. >>> >> >> >>> I think the best solution would be to stop storing null for the >>> blockedMap, and instead store and empty map. This way there will >>> (generally) be a cache hit when looking it up (the cache can't cache a >>> null). The only issue here though is that something then needs to clean up >>> the map. It seems like the StreamingSideInputDoFnRunner could probably set >>> its own cleanup timer here to do that. >>> >>> I'm curious if there are other thoughts on the better ways to do this? >>> >>> Also, as a side note, I was looking at the SideInputHandler >>> implementation in runners-core, and I can't seem to see where the state >>> that it maintains is ever cleaned up? >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-7745 >>> >>
