Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-23 Thread weand
Thanks for clarification. So it really seem a Spark UI OOM Issue.

After setting:
--conf spark.sql.ui.retainedExecutions=10
--conf spark.worker.ui.retainedExecutors=10
--conf spark.worker.ui.retainedDrivers=10
--conf spark.ui.retainedJobs=10
--conf spark.ui.retainedStages=10
--conf spark.ui.retainedTasks=10
--conf spark.streaming.ui.retainedBatches=10

...driver memory consumption still increases constantly over time (ending in
OOM).

TOP 10 Records by Heap Consumption:
Class Name| Objects |
Shallow Heap |Retained Heap
--
org.apache.spark.status.ElementTrackingStore  |   1 |   
   
40 | >= 1.793.945.416
org.apache.spark.util.kvstore.InMemoryStore   |   1 |   
   
24 | >= 1.793.944.760
org.apache.spark.util.kvstore.InMemoryStore$InstanceList  |  13 |   
  
416 | >= 1.792.311.104
org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper   |  16.472 | 
527.104 | >= 1.430.379.120
org.apache.spark.sql.execution.ui.SparkPlanGraphNodeWrapper   | 378.856 |   
9.092.544 | >= 1.415.224.880
org.apache.spark.sql.execution.ui.SparkPlanGraphNode  | 329.440 |  
10.542.080 | >= 1.389.888.112
org.apache.spark.sql.execution.ui.SparkPlanGraphClusterWrapper|  49.416 |   
1.976.640 |   >= 957.701.152
org.apache.spark.sql.execution.ui.SQLExecutionUIData  |   1.000 |  
64.000 |   >= 344.103.096
org.apache.spark.sql.execution.ui.SQLPlanMetric   | 444.744 |  
14.231.808 |>= 14.231.808
org.apache.spark.sql.execution.ui.SparkPlanGraphEdge  | 312.968 |  
10.014.976 |>= 10.014.976
--

>300k instances per SparkPlanGraphNodeWrapper, SparkPlanGraphNode and
SQLPlanMetric.

BTW: we are using 2.3.0.

Shall I fill a new Jira for that memory leak in Spark UI? Only found
https://issues.apache.org/jira/browse/SPARK-15716 but seems something
different.

Trying with spark.ui.enabled=false in the meantime.


Tathagata Das wrote
> Just to be clear, these screenshots are about the memory consumption of
> the
> driver. So this is nothing to do with streaming aggregation state which
> are
> kept in the memory of the executors, not the driver.
> 
> On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim 

> kabhwan@

>  wrote:
> 
>> 1. Could you share your Spark version?
>> 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
>> helps? This configuration is available in 2.3.0, and default value is
>> 1000.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 5월 22일 (화) 오후 4:29, weand 

> andreas.weise@

> 님이 작성:
>>
>>> You can see it even better on this screenshot:
>>>
>>> TOP Entries Collapsed #2
>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> > file/t8542/27_001.png>
>>>
>>> Sorry for the spam, attached a not so perfect screen in the mail before.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: 

> user-unsubscribe@.apache

>>>
>>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
You can see it even better on this screenshot:

TOP Entries Collapsed #2 
 

Sorry for the spam, attached a not so perfect screen in the mail before.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
Instances of org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper are not
cleaned up, see TOP Entries Collapsed #2:

TOP Entries All
 

TOP Entries Collapsed #1
 

TOP Entries Collapsed #2
 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM: Structured Streaming aggregation state not cleaned up propertly

2018-05-19 Thread weand
Nobody has any idea... ?

Is filtering after aggregation in structured streaming supported but maybe
buggy? See following line in the example from earlier mail...
...
.where(F.expr("distinct_username >= 2")) 
...




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



OOM: Structured Streaming aggregation state not cleaned up propertly

2018-05-16 Thread weand
We implemented a streaming query with aggregation on event-time with
watermark. I'm wondering why aggregation state is not cleanup up. According
to documentation old aggregation state should be cleared when using
watermarks. We also don't see any condition [1] for why state should not be
cleanup up.

We do something like this:

event_schema = T.StructType([
T.StructField("remote_ip", T.StringType(), True),
T.StructField("username", T.StringType(), True)
])

stream_writer = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input_topic")
.option("startingOffsets", "earliest")
.load()
.withColumn("key", F.col("key").cast(T.StringType()))
.withColumn("value", F.col("value").cast(T.StringType()))
.withColumn("event", F.from_json(F.col("value"), event_schema))
.select("timestamp", "event.*")
.where("username rlike '[^@]+@[^\.]\..+'")
*.withWatermark("timestamp", "600 seconds")
.groupBy(
F.window("timestamp", "600 seconds", "30 seconds"),
F.col("remote_ip")
)
.agg(
F.approx_count_distinct("username").alias("distinct_username"),
F.collect_set("username").alias("all_usernames"),
)*
.where(F.expr("distinct_username >= 2"))
.select("remote_ip")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "hdfs://...")
.trigger(processingTime="10 seconds")
.outputMode("update")
.start()
)

When running this for several hours we face an Heap OutOfMemory in our
driver application
 

Analyzing the heap dump reveals:
*One instance of "org.apache.spark.status.ElementTrackingStore"* loaded by
"sun.misc.Launcher$AppClassLoader @ 0x800223d0" occupies *1.793.945.416
(93,59%) bytes*. The memory is accumulated in one instance of
"java.util.concurrent.ConcurrentHashMap$Node[]" loaded by "".


I would expect that memory peaks would only appear for the window duration
size (which is 10 minutes in our case). But it seems event state is never
cleaned.

Any ideas?

Regards
Andreas

[1]
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#conditions-for-watermarking-to-clean-aggregation-state



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-10-10 Thread weand
Is Hive from Spark via JDBC working for you? In case it does, I would be
interested in your setup :-)

We can't get this working. See bug here, especially my last comment:
https://issues.apache.org/jira/browse/SPARK-21063

Regards
Andreas



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org