It does not look like. Here is the output of "grep -A2 -i waiting
spark_tdump.log"

"RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"task-result-getter-1" daemon prio=5 tid=101 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
--
"qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
prio=5 tid=193 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - Kinesis Checkpointer - Worker
localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-0" daemon prio=5 tid=84 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-1" prio=5 tid=92 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
prio=5 tid=185 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at
scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
--
"Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"SparkListenerBus" daemon prio=5 tid=18 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-1" daemon prio=5 tid=99 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"block-manager-ask-thread-pool-4" daemon prio=5 tid=180 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-2" prio=5 tid=93 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"MAP_OUTPUT_TRACKER cleanup timer" daemon prio=5 tid=45 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"ForkJoinPool-3-worker-5" daemon prio=5 tid=190 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"java-sdk-http-connection-reaper" daemon prio=5 tid=75 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at
com.amazonaws.http.IdleConnectionReaper.run(IdleConnectionReaper.java:112)
--
"pool-25-thread-1" prio=5 tid=96 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"StreamingListenerBus" daemon prio=5 tid=71 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JMX server connection timeout 152" daemon prio=5 tid=152 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
--
"wal-batching-thread-pool-0" daemon prio=5 tid=137 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-slave-async-thread-pool-7" daemon prio=5 tid=181
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"process reaper" daemon prio=10 tid=97 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-ask-thread-pool-5" daemon prio=5 tid=183 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"netty-rpc-env-timeout" daemon prio=5 tid=60 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"block-manager-slave-async-thread-pool-8" daemon prio=5 tid=182
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RefreshMaterialTask" daemon prio=5 tid=74 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.util.TimerThread.mainLoop(Timer.java:552)
--
"Kinesis Receiver 0" daemon prio=5 tid=91 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:347)
--
"RecurringTimer - BlockGenerator" daemon prio=5 tid=85 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"driver-heartbeater" daemon prio=5 tid=63 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-scheduler-1" daemon prio=5 tid=32 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
--
"sparkDriverActorSystem-akka.actor.default-dispatcher-4" daemon prio=5
tid=35 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"task-result-getter-2" daemon prio=5 tid=159 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"streaming-job-executor-0" daemon prio=5 tid=95 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"dag-scheduler-event-loop" daemon prio=5 tid=62 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"task-result-getter-3" daemon prio=5 tid=161 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"Finalizer" daemon prio=8 tid=3 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"Thread-19" daemon prio=5 tid=86 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BatchedWriteAheadLog Writer" daemon prio=5 tid=81 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"heartbeat-receiver-event-loop-thread" daemon prio=5 tid=59 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RecurringTimer - JobGenerator" daemon prio=5 tid=70 TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"ForkJoinPool-3-worker-7" daemon prio=5 tid=194 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"task-result-getter-0" daemon prio=5 tid=100 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"main" prio=5 tid=1 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JobScheduler" daemon prio=5 tid=80 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - BlockGenerator" daemon prio=5 tid=87 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"dispatcher-event-loop-1" daemon prio=5 tid=20 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"JobGenerator" daemon prio=5 tid=82 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"refresh progress" daemon prio=5 tid=49 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.util.TimerThread.mainLoop(Timer.java:552)
--
"SPARK_CONTEXT cleanup timer" daemon prio=5 tid=48 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"Timer-0" daemon prio=5 tid=61 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)


On Fri, Feb 5, 2016 at 1:31 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> I guess it may be some dead-lock in BlockGenerator. Could you check it by
> yourself?
>
> On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl <udofholl1...@gmail.com> wrote:
>
>> Thank you for your response
>>
>> Unfortunately I cannot share  a thread dump. What are you looking for
>> exactly?
>>
>> Here is the list of the 50 biggest objects (retained size order,
>> descendent):
>>
>> java.util.concurrent.ArrayBlockingQueue#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> scala.concurrent.forkjoin.ForkJoinPool#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.storage.MemoryStore#
>> java.util.LinkedHashMap#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
>> scala.concurrent.forkjoin.ForkJoinTask[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> org.apache.spark.streaming.receiver.BlockGenerator$Block#
>> scala.collection.mutable.ArrayBuffer#
>> java.lang.Object[]#
>> scala.collection.Iterator$$anon$
>> org.apache.spark.InterruptibleIterator#
>> scala.collection.IndexedSeqLike$Elements#
>> scala.collection.mutable.ArrayOps$ofRef#
>> java.lang.Object[]#
>>
>>
>>
>>
>> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Udo,
>>>
>>> mapWithState usually uses much more memory than updateStateByKey since
>>> it caches the states in memory.
>>>
>>> However, from your description, looks BlockGenerator cannot push data
>>> into BlockManager, there may be something wrong in BlockGenerator. Could
>>> you share the top 50 objects in the heap dump and the thread dump?
>>>
>>>
>>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <udofholl1...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>>> huge "Object[]").
>>>>
>>>> I'm pretty sure it has to do with my code, but I barely changed
>>>> anything in the code. Just adapted the function.
>>>>
>>>> Did anyone run into this?
>>>>
>>>> Best regards,
>>>> Udo.
>>>>
>>>
>>>
>>
>

Reply via email to