Sorry for starting a new thread. I couldn't add a response to the existing thread at [1] for some reason. I have a very naive question related to finding a temporary fix to the memory issue raised in [1]. I know Jan suggested to use 2 successive fixed overlapping windows with offset as a temporary solution to dedup the events. However, I am wondering whether using a single fixed window of length let's say 1 day followed by a deduplicate function is a good alternative? I assume that at the end of the window all the timers will be cleared which will result in missing some of the duplicates but I am ok with that.
My pipeline looks something like the following in this case: # p: beam.Pipeline ( p | "ReadData" >> FlinkYelpDatapipelineInput() | "MapToTuple" >> beam.Map(lambda msg: (msg.id, msg)) | "Window" >> beam.WindowInto( window.FixedWindows(1 * 24 * 60 * 60), trigger=AfterCount(1), accumulation_mode=AccumulationMode.DISCARDING, ) # DeduplicatePerKey function [2] | "Deduplicate" >> DeduplicatePerKey( processing_time_duration=30 * 60 ) | "UnmapTuple" >> beam.Map(lambda msg: msg[1]) # Very simple custom ParDo function | "SubmitTask" >> beam.ParDo(ProcessEventFn()) ) It seems to be working when I tested it but I wanted to double check especially considering the the following statement taken from the Beam documentation [3]: "If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key. " [1] https://lists.apache.org/thread.html/rae268806035688b77646195505e5b7a56568a38feb1e52d6341feedd%40%3Cdev.beam.apache.org%3E [2] https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey [3] https://beam.apache.org/documentation/programming-guide/#windowing