Re: Spark Streaming Stuck After 10mins Issue...
It turns out there is a bug in the code which makes an infinite loop some time after start. :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189p23210.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Stuck After 10mins Issue...
And here is the Thread Dump, where seems every worker is waiting for Executor #6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to be complete: Thread 41: BLOCK_MANAGER cleanup timer (WAITING) Thread 42: BROADCAST_VARS cleanup timer (WAITING) Thread 44: shuffle-client-0 (RUNNABLE) Thread 45: shuffle-server-0 (RUNNABLE) Thread 47: Driver Heartbeater (TIMED_WAITING) Thread 48: Executor task launch worker-0 (RUNNABLE) Thread 56: threadDeathWatcher-2-1 (TIMED_WAITING) Thread 81: sparkExecutor-akka.actor.default-dispatcher-18 (WAITING) Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) ** sun.management.ThreadImpl.dumpThreads0(Native Method) sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446) org.apache.spark.util.Utils$.getThreadDump(Utils.scala:1777) org.apache.spark.executor.ExecutorActor$$anonfun$receiveWithLogging$1.applyOrElse(ExecutorActor.scala:38) scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) akka.actor.Actor$class.aroundReceive(Actor.scala:465) org.apache.spark.executor.ExecutorActor.aroundReceive(ExecutorActor.scala:34) akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) akka.actor.ActorCell.invoke(ActorCell.scala:487) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) akka.dispatch.Mailbox.run(Mailbox.scala:220) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ** Thread 112: sparkExecutor-akka.actor.default-dispatcher-25 (WAITING) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189p23190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Stuck After 10mins Issue...
Hi, I have a Spark Streaming application that reads messages from Kafka (multiple topics) and does aggregation on the data via updateStateByKey with 50 Spark workers where each has 1 core and 6G RAM. It is working fine for the first 10mins or so, but then it will stuck in the foreachRDD function. Below is the log (repeating while stuck), executor status, and the code. Log: 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_873_piece0 15/06/06 12:55:20 INFO BlockManager: Removing broadcast 875 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875_piece0 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875_piece0 of size 1820 dropped from memory (free 3369792320) 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_875_piece0 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875 of size 2624 dropped from memory (free 3369794944) *15/06/06 12:56:13 INFO MemoryStore: ensureFreeSpace(1650825) called with curMem=24335467, maxMem=3394130411 15/06/06 12:56:13 INFO MemoryStore: Block input-0-1433620573400 stored as bytes in memory (estimated size 1612.1 KB, free 3.1 GB) 15/06/06 12:56:13 INFO BlockManagerMaster: Updated info of block input-0-1433620573400 15/06/06 12:56:20 INFO MemoryStore: ensureFreeSpace(1682877) called with curMem=25986292, maxMem=3394130411 15/06/06 12:56:20 INFO MemoryStore: Block input-0-1433620579800 stored as bytes in memory (estimated size 1643.4 KB, free 3.1 GB) 15/06/06 12:56:20 INFO BlockManagerMaster: Updated info of block input-0-1433620579800 15/06/06 12:56:25 INFO MemoryStore: ensureFreeSpace(1642661) called with curMem=27669169, maxMem=3394130411 15/06/06 12:56:25 INFO MemoryStore: Block input-0-1433620585000 stored as bytes in memory (estimated size 1604.2 KB, free 3.1 GB) 15/06/06 12:56:25 INFO BlockManagerMaster: Updated info of block input-0-1433620585000* Executor: Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time DurationGC Time Input Size / RecordsShuffle Read Size / Records Errors 1 90870 SUCCESS PROCESS_LOCAL 40 / 10.10.10.10 2015/06/06 12:54:26 3 s 7.7 MB (memory) / 11662 337.2 KB / 6588 0 90860 RUNNING PROCESS_LOCAL 6 / 10.10.10.10 2015/06/06 12:54:26 3.7 min 2 s 7.8 MB (memory) / 11792 327.4 KB / 6422 Memory: 39.0 MB Used (160.1 GB Total) Disk: 0.0 B Used Executor ID Address RDD Blocks Memory Used Disk Used Active TasksFailed Tasks Complete Tasks Total Tasks Task Time Input ▴ Shuffle Read Shuffle Write LogsThread Dump 6 10.10.10.10:49298 2 15.6 MB / 3.2 GB0.0 B 1 0 412 413 7.0 m 1291.3 MB 8.3 MB 1437.1 KB 40 10.10.10.10:37480 3 23.4 MB / 3.2 GB0.0 B 0 0 413 413 7.1 m 1288.6 MB 10.8 MB 4.0 MB Sample code: val stateDStream = analyticsDStream.updateStateByKey[StreamBase](updateAnalyticsIterFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) stateDStream.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(record => { //Do nothing, and it is still stuck }) }) }) There is no error messages, and the memory usage seems fine though. Is there any reason it will be stuck? How can I resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Not Reading Messages From Multiple Kafka Topics
Hi all, I'm building a Spark Streaming application that will continuously read multiple kafka topics at the same time. However, I found a weird issue that it reads only hundreds of messages then it stopped reading any more. If I changed the three topic to only one topic, then it is fine and it will continue to consume. Below is the code I have. val consumerThreadsPerInputDstream = 1 val topics = Map("raw_0" -> consumerThreadsPerInputDstream) "raw_1" -> consumerThreadsPerInputDstream, "raw_2" -> consumerThreadsPerInputDstream) val msgs = KafkaUtils.createStream(ssc, "10.10.10.10:2181/hkafka", "group01", topics).map(_._2) ... How come it will no longer consume after hundreds of messages for three topic reading? How to resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Timeout Issues from Spark 1.2.0+
Hi all, I'm trying to run the sample Spark application in version v1.2.0 and above. However, I've encountered a weird issue like below. This issue only be seen in v1.2.0 and above, but v1.1.0 and v1.1.1 are fine. The sample code: val sc : SparkContext = new SparkContext(conf) val NUM_SAMPLES = 10 val count = sc.parallelize(1 to NUM_SAMPLES).map{i => val x = Math.random() val y = Math.random() if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / NUM_SAMPLES) The exception: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, eason.hu); users with modify permissions: Set(hduser, eason.hu) 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started 15/03/19 01:10:18 INFO Remoting: Starting remoting 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@hduser-07:59122] 15/03/19 01:10:18 INFO Utils: Successfully started service 'driverPropsFetcher' on port 59122. 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://sparkDriver@192.168.1.53:65001]]. 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:144) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) ... 7 more Do you have any clues why it happens only after v1.2.0 and above? How to resolve this issue? Thank you very much, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Timeout-Issues-from-Spark-1-2-0-tp22150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Hi all, I've been using Spark 1.1.0 for a while, and now would like to upgrade to Spark 1.1.1 or above. However, it throws the following errors: 18:05:31.522 [sparkDriver-akka.actor.default-dispatcher-3hread] ERROR TaskSchedulerImpl - Lost executor 37 on hcompute001: remote Akka client disassociated 18:05:31.530 [sparkDriver-akka.actor.default-dispatcher-3hread] WARN TaskSetManager - Lost task 0.0 in stage 1.0 (TID 0, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.567 [sparkDriver-akka.actor.default-dispatcher-2hread] ERROR TaskSchedulerImpl - Lost executor 3 on hcompute001: remote Akka client disassociated 18:05:31.568 [sparkDriver-akka.actor.default-dispatcher-2hread] WARN TaskSetManager - Lost task 1.0 in stage 1.0 (TID 1, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.988 [sparkDriver-akka.actor.default-dispatcher-23hread] ERROR TaskSchedulerImpl - Lost executor 24 on hcompute001: remote Akka client disassociated Do you know what may go wrong? I didn't change any codes, just changed the version of Spark. Thank you all, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Upgrade-from-Spark-1-1-0-to-1-1-1-Issues-tp22045.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setup Remote HDFS for Spark
Unfortunately whether it is possible to have both Spark and HDFS running on the same machine is not under our control. :( Right now we have Spark and HDFS running in different machines. In this case, is it still possible to hook up a remote HDFS with Spark so that we can use Spark Streaming checkpoints? Thank you for your help. Best, EH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481p19485.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setup Remote HDFS for Spark
Hi, Are there any way that I can setup a remote HDFS for Spark (more specific, for Spark Streaming checkpoints)? The reason I'm asking is that our Spark and HDFS do not run on the same machines. I've been looked around but still no clue so far. Thanks, EH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org