We implemented a streaming query with aggregation on event-time with watermark. I'm wondering why aggregation state is not cleanup up. According to documentation old aggregation state should be cleared when using watermarks. We also don't see any condition [1] for why state should not be cleanup up.
We do something like this: event_schema = T.StructType([ T.StructField("remote_ip", T.StringType(), True), T.StructField("username", T.StringType(), True) ]) stream_writer = (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "input_topic") .option("startingOffsets", "earliest") .load() .withColumn("key", F.col("key").cast(T.StringType())) .withColumn("value", F.col("value").cast(T.StringType())) .withColumn("event", F.from_json(F.col("value"), event_schema)) .select("timestamp", "event.*") .where("username rlike '[^@]+@[^\.]\..+'") *.withWatermark("timestamp", "600 seconds") .groupBy( F.window("timestamp", "600 seconds", "30 seconds"), F.col("remote_ip") ) .agg( F.approx_count_distinct("username").alias("distinct_username"), F.collect_set("username").alias("all_usernames"), )* .where(F.expr("distinct_username >= 2")) .select("remote_ip") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "output_topic") .option("checkpointLocation", "hdfs://...") .trigger(processingTime="10 seconds") .outputMode("update") .start() ) When running this for several hours we face an Heap OutOfMemory in our driver application <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8542/56_001.png> Analyzing the heap dump reveals: *One instance of "org.apache.spark.status.ElementTrackingStore"* loaded by "sun.misc.Launcher$AppClassLoader @ 0x800223d0" occupies *1.793.945.416 (93,59%) bytes*. The memory is accumulated in one instance of "java.util.concurrent.ConcurrentHashMap$Node[]" loaded by "<system class loader>". I would expect that memory peaks would only appear for the window duration size (which is 10 minutes in our case). But it seems event state is never cleaned. Any ideas? Regards Andreas [1] http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#conditions-for-watermarking-to-clean-aggregation-state -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org