We implemented a streaming query with aggregation on event-time with
watermark. I'm wondering why aggregation state is not cleanup up. According
to documentation old aggregation state should be cleared when using
watermarks. We also don't see any condition [1] for why state should not be
cleanup up.

We do something like this:

event_schema = T.StructType([
    T.StructField("remote_ip", T.StringType(), True),
    T.StructField("username", T.StringType(), True)
])

stream_writer = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "input_topic")
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("key", F.col("key").cast(T.StringType()))
    .withColumn("value", F.col("value").cast(T.StringType()))
    .withColumn("event", F.from_json(F.col("value"), event_schema))
    .select("timestamp", "event.*")
    .where("username rlike '[^@]+@[^\.]\..+'")
    *.withWatermark("timestamp", "600 seconds")
    .groupBy(
        F.window("timestamp", "600 seconds", "30 seconds"),
        F.col("remote_ip")
    )
    .agg(
        F.approx_count_distinct("username").alias("distinct_username"),
        F.collect_set("username").alias("all_usernames"),
    )*
    .where(F.expr("distinct_username >= 2"))
    .select("remote_ip")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "output_topic")
    .option("checkpointLocation", "hdfs://...")
    .trigger(processingTime="10 seconds")
    .outputMode("update")
    .start()
)

When running this for several hours we face an Heap OutOfMemory in our
driver application
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8542/56_001.png> 

Analyzing the heap dump reveals:
*One instance of "org.apache.spark.status.ElementTrackingStore"* loaded by
"sun.misc.Launcher$AppClassLoader @ 0x800223d0" occupies *1.793.945.416
(93,59%) bytes*. The memory is accumulated in one instance of
"java.util.concurrent.ConcurrentHashMap$Node[]" loaded by "<system class
loader>".


I would expect that memory peaks would only appear for the window duration
size (which is 10 minutes in our case). But it seems event state is never
cleaned.

Any ideas?

Regards
Andreas

[1]
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#conditions-for-watermarking-to-clean-aggregation-state



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to