RE: Spark on YARN
You’d better also check the log of nodemanager, sometimes because your memory usage exceeds the limit of Yarn container’s configuration. I’ve met similar problem before, here is the warning log in nodemanager: 2015-07-07 17:06:07,141 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=17385,containerID=container_1436259427993_0001_02_01] is running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. The default pmem-vmem ratio is 2.1, but seems executor requires more vmem when started, so nodemanager will kill it. If you met similar problem, you could increase this configuration “yarn.nodemanager.vmem-pmem-ratio”. Thanks Jerry From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Thursday, July 30, 2015 4:36 PM To: Jeetendra Gangele Cc: user Subject: Re: Spark on YARN 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM AM is killed somehow, may due to preemption. Does it always happen ? Resource manager log would be helpful. On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com wrote: I can't see the application logs here. All the logs are going into stderr. can anybody help here? On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com wrote: I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411http://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at
RE: kafka offset commit in spark streaming 1.2
If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
RE: kafka offset commit in spark streaming 1.2
Please see the inline comments. From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:51 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 So If WAL is disabled, how developer can commit offset explicitly in spark streaming app since we don't write code which will be executed in receiver ? I think it is difficult for user to commit offset explicitly in receiver-based Spark Streaming Kafka API. If you want to explicitly commit offset, you could try Spark Streaming Kafka direct API, which is newly added in Spark 1.3+, where you could manage the offsets yourself, it is implemented based on Kafka’s low-level API. Plus since offset commitment is asynchronoous, is it possible -it may happen last offset is not commited yet and next stream batch started on receiver and it may get duplicate data ? Yes, it is possible, so receiver based Spark Streaming Kafka API cannot guarantee no duplication and no data lost. If you enable WAL, no data lost can be guaranteed by still you will meet duplication. So the best way is to use Spark Streaming Kafka direct API with your own offset management to make sure exact-once. On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit is disabled, no any part will call commitOffset, you need to call this API yourself. Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized with replication. From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:30 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the message or will it use autocommit.enable value. And if it uses this value what if autocommit.enable is set to false then when does receiver calls commitOffset? On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
RE: kafka offset commit in spark streaming 1.2
If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit is disabled, no any part will call commitOffset, you need to call this API yourself. Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized with replication. From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:30 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the message or will it use autocommit.enable value. And if it uses this value what if autocommit.enable is set to false then when does receiver calls commitOffset? On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
The shuffle data can be deleted through weak reference mechanism, you could check the code of ContextCleaner, also you could trigger a full gc manually with JVisualVM or some other tools to see if shuffle files are deleted. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 5:28 PM To: Shao, Saisai; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl Jerry, I agree with you. However, in my case, I kept the monitoring the blockmanager folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. [cid:image001.jpg@01D0A2DB.739904D0] -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR
RE: Possible long lineage issue when using DStream to update a normal RDD
I think you could use checkpoint to cut the lineage of `MyRDD`, I have a similar scenario and I use checkpoint to workaround this problem :) Thanks Jerry -Original Message- From: yaochunnan [mailto:yaochun...@gmail.com] Sent: Friday, May 8, 2015 1:57 PM To: user@spark.apache.org Subject: Possible long lineage issue when using DStream to update a normal RDD Hi all, Recently in our project, we need to update a RDD using data regularly received from DStream, I plan to use foreachRDD API to achieve this: var MyRDD = ... dstream.foreachRDD { rdd = MyRDD = MyRDD.join(rdd)... ... } Is this usage correct? My concern is, as I am repeatedly and endlessly reassigning MyRDD in order to update it, will it create a too long RDD lineage to process when I want to query MyRDD later on (similar as https://issues.apache.org/jira/browse/SPARK-4672) ? Maybe I should: 1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a dstream comes in. 2. use the unpublished IndexedRDD (https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update. As I lack experience using Spark Streaming and indexedRDD, I am here to make sure my thoughts are on the right track. Your wise suggestions will be greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Possible long lineage issue when using DStream to update a normal RDD
IIUC only checkpoint will clean the lineage information, cache will not cut the lineage. Also checkpoint will put the data in HDFS, not local disk :) I think you can use foreachRDD to do such RDD update work, it’s OK as I know from your code snippet. From: Chunnan Yao [mailto:yaochun...@gmail.com] Sent: Friday, May 8, 2015 2:51 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Possible long lineage issue when using DStream to update a normal RDD Thank you for this suggestion! But may I ask what's the advantage to use checkpoint instead of cache here? Cuz they both cut lineage. I only know checkpoint saves RDD in disk, while cache in memory. So may be it's for reliability? Also on http://spark.apache.org/docs/latest/streaming-programming-guide.html, I have not seen usage of foreachRDD like mine. Here I am not pushing data to external system. I just use it to update an RDD in Spark. Is this right? 2015-05-08 14:03 GMT+08:00 Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com: I think you could use checkpoint to cut the lineage of `MyRDD`, I have a similar scenario and I use checkpoint to workaround this problem :) Thanks Jerry -Original Message- From: yaochunnan [mailto:yaochun...@gmail.commailto:yaochun...@gmail.com] Sent: Friday, May 8, 2015 1:57 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Possible long lineage issue when using DStream to update a normal RDD Hi all, Recently in our project, we need to update a RDD using data regularly received from DStream, I plan to use foreachRDD API to achieve this: var MyRDD = ... dstream.foreachRDD { rdd = MyRDD = MyRDD.join(rdd)... ... } Is this usage correct? My concern is, as I am repeatedly and endlessly reassigning MyRDD in order to update it, will it create a too long RDD lineage to process when I want to query MyRDD later on (similar as https://issues.apache.org/jira/browse/SPARK-4672) ? Maybe I should: 1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a dstream comes in. 2. use the unpublished IndexedRDD (https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update. As I lack experience using Spark Streaming and indexedRDD, I am here to make sure my thoughts are on the right track. Your wise suggestions will be greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
RE: shuffle.FetchFailedException in spark on YARN job
I don’t think this problem is related to Netty or NIO, switching to nio will not change this part of code path to get the index file for sort-based shuffle reader. I think you could check your system from some aspects: 1. Is there any hardware problem like disk full or others which makes this file lost or non-exist, this can introduce such exception. 2. Do you have any other exception besides this one, mainly the shuffle fetch failed problem means your job is in abnormal status, some other problems may also introduce this error. Thanks Jerry From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, April 20, 2015 2:56 PM To: roy Cc: user@spark.apache.org Subject: Re: shuffle.FetchFailedException in spark on YARN job Which version of Spark are you using? Did you try using spark.shuffle.blockTransferService=nio Thanks Best Regards On Sat, Apr 18, 2015 at 11:14 PM, roy rp...@njit.edumailto:rp...@njit.edu wrote: Hi, My spark job is failing with following error message org.apache.spark.shuffle.FetchFailedException: /mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index (No such file or directory) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:89) at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:235) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:268) at org.apache.spark.storage.ShuffleBlockFetcherIterator.init(ShuffleBlockFetcherIterator.scala:115) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:76) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) ... 7 more ) my job
RE: Spark Directed Acyclic Graph / Jobs
I think this paper will be a good resource (https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), also the paper of Dryad is also a good one. Thanks Jerry From: James King [mailto:jakwebin...@gmail.com] Sent: Friday, April 17, 2015 3:26 PM To: user Subject: Spark Directed Acyclic Graph / Jobs Is there a good resource that explains how Spark jobs gets broken down to tasks and executions. I just need to get a better understanding of this. Regards j
RE: Spark + Kafka
OK, seems there’s nothing strange from your code. So maybe we need to narrow down the cause, would you please run KafkaWordCount example in Spark to see if it is OK, if this is OK, then we should focus on your implementation, otherwise Kafka potentially has some problems. Thanks Jerry From: James King [mailto:jakwebin...@gmail.com] Sent: Wednesday, April 1, 2015 6:59 PM To: Saisai Shao Cc: bit1...@163.com; user Subject: Re: Spark + Kafka This is the code. And I couldn't find anything like the log you suggested. public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); MapString, Integer topics = new HashMapString, Integer(); topics.put(test, 1); JavaPairDStreamString, String input = KafkaUtils.createStream(spark, somesparkhost:2181, groupid, topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King jakwebin...@gmail.commailto:jakwebin...@gmail.com wrote: Thanks Saisai, Sure will do. But just a quick note that when i set master as local[*] Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.commailto:sai.sai.s...@gmail.com wrote: Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about Stream xxx received block xxx, this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.commailto:jakwebin...@gmail.com: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.commailto:bit1...@163.com bit1...@163.commailto:bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. From: James Kingmailto:jakwebin...@gmail.com Date: 2015-04-01 15:21 To: usermailto:user@spark.apache.org Subject: Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from
RE: [spark-streaming] can shuffle write to disk be disabled?
From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. As I said, did you think shuffle is the bottleneck which makes your job running slowly? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 6:41 PM To: Akhil Das Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spill false On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.commailto:darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0 slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325 116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
RE: [spark-streaming] can shuffle write to disk be disabled?
Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 3:24 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? Hi, Saisai Here is the duration of one of the jobs, 22 seconds in total, it is longer than the sliding window. Stage Id Description Submitted Duration Tasks: Succeeded/Total Input Output Shuffle Read Shuffle Write 342 foreach at SimpleApp.scala:58 2015/03/18 15:06:58 16 s 288/28810.6 MB 341 window at SimpleApp.scala:512015/03/18 15:06:52 6s 288/288 12.3 MB 10.6 MB And part of the driver log: 15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 1426662996000 ms 15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1426662932000 ms) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 100515, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 100509) in 370 ms on lvs02 (75/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 100516, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 100511) in 261 ms on lvs02 (76/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 100517, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 100512) in 274 ms on lvs02 (77/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 100518, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 100508) in 569 ms on lvs02 (78/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 100519, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 100510) in 539 ms on lvs02 (79/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 100520, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 100514) in 296 ms on lvs02 (80/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 100521, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 100515) in 292 ms on lvs02 (81/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 100522, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 100516) in 331 ms on lvs02 (82/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 100523, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 100517) in 271 ms on lvs02 (83/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 100524, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 100513) in 549 ms on lvs02 (84/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 100525, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 100518) in 327 ms on lvs02 (85/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 100526, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 100520) in 293 ms on lvs02 (86/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 100527, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 100521) in 257 ms on lvs02 (87/291) 15/03/18 15:16:36 INFO TaskSetManager
RE: [spark-streaming] can shuffle write to disk be disabled?
Please see the inline comments. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 9:30 PM To: Shao, Saisai Cc: user@spark.apache.org; Akhil Das Subject: Re: [spark-streaming] can shuffle write to disk be disabled? On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? This is the internal mechanism for Spark. As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? You can monitor the system metrics, as well as JVM, also information from web UI is very useful. Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
RE: [spark-streaming] can shuffle write to disk be disabled?
Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency. Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 1:39 PM To: user@spark.apache.org Subject: [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log = { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) = { val uv = visits.size (uv, url) }}) upv.foreach(rdd = println(new Date() + \n---\n + rdd.top(20).mkString(\n) + \n)) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark- in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class SimpleApp \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?
RE: MappedStream vs Transform API
I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry From: madhu phatak [mailto:phatak@gmail.com] Sent: Monday, March 16, 2015 4:32 PM To: user@spark.apache.org Subject: MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. def map[U: ClassTag](mapFunc: T = U): DStream[U] = { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API def map[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
RE: Building spark over specified tachyon
I think you could change the pom file under Spark project to update the Tachyon related dependency version and rebuild it again (in case API is compatible, and behavior is the same). I'm not sure is there any command you can use to compile against Tachyon version. Thanks Jerry From: fightf...@163.com [mailto:fightf...@163.com] Sent: Monday, March 16, 2015 11:01 AM To: user Subject: Building spark over specified tachyon Hi, all Noting that the current spark releases are built-in with tachyon 0.5.0 , if we want to recompile spark with maven and targeting on specific tachyon version (let's say the most recent 0.6.0 release), how should that be done? What maven compile command should be like ? Thanks, Sun. fightf...@163.commailto:fightf...@163.com
RE: Spark Streaming input data source list
Hi Lin, AFAIK, currently there's no built-in receiver API for RDBMs, but you can customize your own receiver to get data from RDBMs, for the details you can refer to the docs. Thanks Jerry From: Cui Lin [mailto:cui@hds.com] Sent: Tuesday, March 10, 2015 8:36 AM To: Tathagata Das Cc: user@spark.apache.org Subject: Re: Spark Streaming input data source list Tathagata, Thanks for your quick response. The link is helpful to me. Do you know any API for streaming data from RMDB ? Best regards, Cui Lin From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Monday, March 9, 2015 at 11:28 AM To: Cui Lin cui@hds.commailto:cui@hds.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming input data source list Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.commailto:cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?... I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
RE: distribution of receivers in spark streaming
Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
RE: distribution of receivers in spark streaming
Yes, hostname is enough. I think currently it is hard for user code to get the worker list from standalone master. If you can get the Master object, you could get the worker list, but AFAIK may be it is difficult to get this object. All you could do is to manually get the worker list and assigned its hostname to each receiver. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com] Sent: Thursday, March 5, 2015 2:29 PM To: Shao, Saisai; User Subject: Re: distribution of receivers in spark streaming Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks, Du On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
RE: Having lots of FetchFailedException in join
Yes, if one key has too many values, there still has a chance to meet the OOM. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:49 PM To: Shao, Saisai Cc: Cheng, Hao; user Subject: Re: Having lots of FetchFailedException in join I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247
RE: Having lots of FetchFailedException in join
Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
RE: Having lots of FetchFailedException in join
I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31
RE: Monitoring Spark with Graphite and Grafana
Cool, great job☺. Thanks Jerry From: Ryan Williams [mailto:ryan.blake.willi...@gmail.com] Sent: Thursday, February 26, 2015 6:11 PM To: user; d...@spark.apache.org Subject: Monitoring Spark with Graphite and Grafana If anyone is curious to try exporting Spark metrics to Graphite, I just published a post about my experience doing that, building dashboards in Grafanahttp://grafana.org/, and using them to monitor Spark jobs: http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/ Code for generating Grafana dashboards tailored to the metrics emitted by Spark is here: https://github.com/hammerlab/grafana-spark-dashboards. If anyone else is interested in working on expanding MetricsSystem to make this sort of thing more useful, let me know, I've been working on it a fair amount and have a bunch of ideas about where it should go. Thanks, -Ryan
RE: spark streaming window operations on a large window size
I don't think current Spark Streaming supports window operations which beyond its available memory, internally Spark Streaming puts all the data in the memory belongs to the effective window, if the memory is not enough, BlockManager will discard the blocks at LRU policy, so something unexpected will be occurred. Thanks Jerry -Original Message- From: avilevi3 [mailto:avile...@gmail.com] Sent: Monday, February 23, 2015 12:57 AM To: user@spark.apache.org Subject: spark streaming window operations on a large window size Hi guys, does spark streaming supports window operations on a sliding window that is data is larger than the available memory? we would like to currently we are using kafka as input, but we could change that if needed. thanks Avi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Union and reduceByKey will trigger shuffle even same partition?
If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here? [cid:image001.png@01D04F62.180BCC00]
RE: Union and reduceByKey will trigger shuffle even same partition?
I think some RDD APIs like zipPartitions or others can do this as you wanted. I might check the docs. Thanks Jerry From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 1:35 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B's local partition is known small enough put in memory), how can I access other RDD's local partition in the mapParitition method? Is it anyway to do this in Spark? From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here? [cid:image001.png@01D04F73.AFB2D330]
RE: Union and reduceByKey will trigger shuffle even same partition?
I've no context of this book, AFAIK union will not trigger shuffle, as they just put the partitions together, the operator reduceByKey() will actually trigger shuffle. Thanks Jerry From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:26 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? In the book of learning spark: [cid:image002.jpg@01D04F74.28C9F870] So here it means only no shuffle happen crossing network but still will do shuffle locally? Even it is the case, why union will trigger shuffle? I think union will only just append the RDD together. From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here? [cid:image003.png@01D04F74.28C9F870]
RE: Spark Metrics Servlet for driver and executor
Hi Judy, For driver, it is /metrics/json, there's no metricsServlet for executor. Thanks Jerry From: Judy Nash [mailto:judyn...@exchange.microsoft.com] Sent: Friday, February 6, 2015 3:47 PM To: user@spark.apache.org Subject: Spark Metrics Servlet for driver and executor Hi all, Looking at spark metricsServlet. What is the url exposing driver executor json response? Found master and worker successfully, but can't find url that return json for the other 2 sources. Thanks! Judy
RE: Error KafkaStream
Did you include Kafka jars? This StringDecoder is under kafka/serializer, You can refer to the unit test KafkaStreamSuite in Spark to see how to use this API. Thanks Jerry From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 9:44 AM To: Shao, Saisai Cc: Sean Owen; user@spark.apache.org Subject: Re: Error KafkaStream Hi Shao, When I changed to StringDecoder I’ve get this compiling error: [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:78: not found: type StringDecoder [error] KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,stora geLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:85: value split is not a member of Nothing [error] val words = unifiedStream.flatMap(_.split( )) [error] ^ [error] /sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW ordCount.scala:86: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(Nothing, Long)] [error] val wordCounts = words.map(x = (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(20), Seconds(10), 2) [error] ^ [error] three errors found [error] (examples/compile:compile) Compilation failed On Feb 6, 2015, at 02:11, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.itmailto:e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org Informativa sulla Privacy: http://www.unibs.it/node/8155
RE: Error KafkaStream
Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Questions about Spark standalone resource scheduler
Hi all, I have some questions about the future development of Spark's standalone resource scheduler. We've heard some users have the requirements to have multi-tenant support in standalone mode, like multi-user management, resource management and isolation, whitelist of users. Seems current Spark standalone do not support such kind of functionalities, while resource schedulers like Yarn offers such kind of advanced managements, I'm not sure what's the future target of standalone resource scheduler, will it only target on simple implementation, and for advanced usage shift to YARN? Or will it plan to add some simple multi-tenant related functionalities? Thanks a lot for your comments. BR Jerry
RE: Questions about Spark standalone resource scheduler
Hi Patrick, Thanks a lot for your detailed explanation. For now we have such requirements: whitelist the application submitter, user resources (CPU, MEMORY) quotas, resources allocations in Spark Standalone mode. These are quite specific requirements for production-use, generally these problem will become whether we need to offer a more advanced resource scheduler compared to current simple FIFO one. I think our aim is to not provide a general resource scheduler like Mesos/Yarn, we only support Spark, but we hope to add some Mesos/Yarn functionalities to better use of Spark standalone mode. I admitted that resource scheduler may have some overlaps with cloud manager, whether to offer a powerful scheduler or use cloud manager is really a dilemma. I think we can break down to some small features to improve the standalone mode. What's your opinion? Thanks Jerry -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Monday, February 2, 2015 4:49 PM To: Shao, Saisai Cc: d...@spark.apache.org; user@spark.apache.org Subject: Re: Questions about Spark standalone resource scheduler Hey Jerry, I think standalone mode will still add more features over time, but the goal isn't really for it to become equivalent to what Mesos/YARN are today. Or at least, I doubt Spark Standalone will ever attempt to manage _other_ frameworks outside of Spark and become a general purpose resource manager. In terms of having better support for multi tenancy, meaning multiple *Spark* instances, this is something I think could be in scope in the future. For instance, we added H/A to the standalone scheduler a while back, because it let us support H/A streaming apps in a totally native way. It's a trade off of adding new features and keeping the scheduler very simple and easy to use. We've tended to bias towards simplicity as the main goal, since this is something we want to be really easy out of the box. One thing to point out, a lot of people use the standalone mode with some coarser grained scheduler, such as running in a cloud service. In this case they really just want a simple inner cluster manager. This may even be the majority of all Spark installations. This is slightly different than Hadoop environments, where they might just want nice integration into the existing Hadoop stack via something like YARN. - Patrick On Mon, Feb 2, 2015 at 12:24 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi all, I have some questions about the future development of Spark's standalone resource scheduler. We've heard some users have the requirements to have multi-tenant support in standalone mode, like multi-user management, resource management and isolation, whitelist of users. Seems current Spark standalone do not support such kind of functionalities, while resource schedulers like Yarn offers such kind of advanced managements, I'm not sure what's the future target of standalone resource scheduler, will it only target on simple implementation, and for advanced usage shift to YARN? Or will it plan to add some simple multi-tenant related functionalities? Thanks a lot for your comments. BR Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: reduceByKeyAndWindow, but using log timestamps instead of clock seconds
That's definitely a good supplement to the current Spark Streaming, I've heard many guys want to process the data using log time. Looking forward to the code. Thanks Jerry -Original Message- From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Thursday, January 29, 2015 10:33 AM To: Tobias Pfeiffer Cc: YaoPau; user Subject: Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds Ohhh nice! Would be great if you can share us some code soon. It is indeed a very complicated problem and there is probably no single solution that fits all usecases. So having one way of doing things would be a great reference. Looking forward to that! On Wed, Jan 28, 2015 at 4:52 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote: My thinking is to maintain state in an RDD and update it an persist it with each 2-second pass, but this also seems like it could get messy. Any thoughts or examples that might help me? I have just implemented some timestamp-based windowing on DStreams (can't share the code now, but will be published a couple of months ahead), although with the assumption that items are in correct order. The main challenge (rather technical) was to keep proper state across RDD boundaries and to tell the state you can mark this partial window from the last interval as 'complete' now without shuffling too much data around. For example, if there are some empty intervals, you don't know when the next item to go into the partial window will arrive, or if there will be one at all. I guess if you want to have out-of-order tolerance, that will become even trickier, as you need to define and think about some timeout for partial windows in your state... Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Aha, you’re right, I did a wrong comparison, the reason might be only for checkpointing :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:39 AM To: Shao, Saisai Cc: user Subject: Re: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Hey Tobias, I think one consideration is for checkpoint of DStream which guarantee driver fault tolerance. Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:16 AM To: user Subject: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, I want to do something like dstream.foreachRDD(rdd = if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias
RE: Shuffle to HDFS
Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Sunday, January 25, 2015 4:45 PM To: u...@spark.incubator.apache.org Subject: Shuffle to HDFS How to change shuffle output to HDFS or NFS?
RE: where storagelevel DISK_ONLY persists RDD to
No, current RDD persistence mechanism do not support putting data on HDFS. The directory is spark.local.dirs. Instead you can use checkpoint() to save the RDD on HDFS. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Monday, January 26, 2015 3:08 PM To: Charles Feduke Cc: u...@spark.incubator.apache.org Subject: Re: where storagelevel DISK_ONLY persists RDD to Hi, Charles Thanks for your reply. Is it possible to persist RDD to HDFS? What is the default location to persist RDD with storagelevel DISK_ONLY? On Sun, Jan 25, 2015 at 6:26 AM, Charles Feduke charles.fed...@gmail.commailto:charles.fed...@gmail.com wrote: I think you want to instead use `.saveAsSequenceFile` to save an RDD to someplace like HDFS or NFS it you are attempting to interoperate with another system, such as Hadoop. `.persist` is for keeping the contents of an RDD around so future uses of that particular RDD don't need to recalculate its composite parts. On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.commailto:larryli...@gmail.com wrote: I would like to persist RDD TO HDFS or NFS mount. How to change the location?
RE: Shuffle to HDFS
Hey Larry, I don’t think Hadoop will put shuffle output in HDFS, instead it’s behavior is the same as what Spark did, store mapper output (shuffle) data on local disks. You might misunderstood something ☺. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Monday, January 26, 2015 3:03 PM To: Shao, Saisai Cc: u...@spark.incubator.apache.org Subject: Re: Shuffle to HDFS Hi,Jerry Thanks for your reply. The reason I have this question is that in Hadoop, mapper intermediate output (shuffle) will be stored in HDFS. I think the default location for spark is /tmp I think. Larry On Sun, Jan 25, 2015 at 9:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.commailto:larryli...@gmail.com] Sent: Sunday, January 25, 2015 4:45 PM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Shuffle to HDFS How to change shuffle output to HDFS or NFS?
RE: spark streaming with checkpoint
Hi, A new RDD will be created in each slide duration, if there’s no data coming, an empty RDD will be generated. I’m not sure there’s way to alleviate your problem from Spark side. Is your application design have to build such a large window, can you change your implementation if it is easy for you? I think it’s better and easy for you to change your implementation rather than rely on Spark to handle this. Thanks Jerry From: Balakrishnan Narendran [mailto:balu.na...@gmail.com] Sent: Friday, January 23, 2015 12:19 AM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: spark streaming with checkpoint Thank you Jerry, Does the window operation create new RDDs for each slide duration..? I am asking this because i see a constant increase in memory even when there is no logs received. If not checkpoint is there any alternative that you would suggest.? On Tue, Jan 20, 2015 at 7:08 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, Seems you have such a large window (24 hours), so the phenomena of memory increasing is expectable, because of window operation will cache the RDD within this window in memory. So for your requirement, memory should be enough to hold the data of 24 hours. I don’t think checkpoint in Spark Streaming can alleviate such problem, because checkpoint are mainly for fault tolerance. Thanks Jerry From: balu.naren [mailto:balu.na...@gmail.commailto:balu.na...@gmail.com] Sent: Tuesday, January 20, 2015 7:17 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: spark streaming with checkpoint I am a beginner to spark streaming. So have a basic doubt regarding checkpoints. My use case is to calculate the no of unique users by day. I am using reduce by key and window for this. Where my window duration is 24 hours and slide duration is 5 mins. I am updating the processed record to mongodb. Currently I am replace the existing record each time. But I see the memory is slowly increasing over time and kills the process after 1 and 1/2 hours(in aws small instance). The DB write after the restart clears all the old data. So I understand checkpoint is the solution for this. But my doubt is * What should my check point duration be..? As per documentation it says 5-10 times of slide duration. But I need the data of entire day. So it is ok to keep 24 hrs. * Where ideally should the checkpoint be..? Initially when I receive the stream or just before the window operation or after the data reduction has taken place. Appreciate your help. Thank you View this message in context: spark streaming with checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
RE: spark streaming with checkpoint
Hi, Seems you have such a large window (24 hours), so the phenomena of memory increasing is expectable, because of window operation will cache the RDD within this window in memory. So for your requirement, memory should be enough to hold the data of 24 hours. I don't think checkpoint in Spark Streaming can alleviate such problem, because checkpoint are mainly for fault tolerance. Thanks Jerry From: balu.naren [mailto:balu.na...@gmail.com] Sent: Tuesday, January 20, 2015 7:17 PM To: user@spark.apache.org Subject: spark streaming with checkpoint I am a beginner to spark streaming. So have a basic doubt regarding checkpoints. My use case is to calculate the no of unique users by day. I am using reduce by key and window for this. Where my window duration is 24 hours and slide duration is 5 mins. I am updating the processed record to mongodb. Currently I am replace the existing record each time. But I see the memory is slowly increasing over time and kills the process after 1 and 1/2 hours(in aws small instance). The DB write after the restart clears all the old data. So I understand checkpoint is the solution for this. But my doubt is * What should my check point duration be..? As per documentation it says 5-10 times of slide duration. But I need the data of entire day. So it is ok to keep 24 hrs. * Where ideally should the checkpoint be..? Initially when I receive the stream or just before the window operation or after the data reduction has taken place. Appreciate your help. Thank you View this message in context: spark streaming with checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
RE: dynamically change receiver for a spark stream
Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Streaming with Java: Expected ReduceByWindow to Return JavaDStream
Hi Jeff, From my understanding it seems more like a bug, since JavaDStreamLike is used for Java code, return a Scala DStream is not reasonable. You can fix this by submitting a PR, or I can help you to fix this. Thanks Jerry From: Jeff Nadler [mailto:jnad...@srcginc.com] Sent: Monday, January 19, 2015 2:04 PM To: user@spark.apache.org Subject: Streaming with Java: Expected ReduceByWindow to Return JavaDStream Can anyone tell me if my expectations are sane? I'm trying to do a reduceByWindow using the 3-arg signature (not providing an inverse reduce function): JavaDStreamwhatevs reducedStream = messages.reduceByWindow((x, y) - reduce(x, y), Durations.seconds(5), Durations.seconds(5)); This isn't building; looks like it's returning DStream not JavaDStream. From JavaDStreamLike.scala, looks like this sig returns DStream, the 4-arg sig with the inverse reduce returns JavaDStream. def reduceByWindow( reduceFunc: (T, T) = T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = { dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) } So I'm just a noob. Is this a bug or am I missing something? Thanks! Jeff Nadler
RE: How to replay consuming messages from kafka using spark streaming?
I think there're two solutions: 1. Enable write ahead log in Spark Streaming if you're using Spark 1.2. 2. Using third-party Kafka consumer (https://github.com/dibbhatt/kafka-spark-consumer). Thanks Saisai -Original Message- From: mykidong [mailto:mykid...@gmail.com] Sent: Thursday, January 15, 2015 11:59 AM To: user@spark.apache.org Subject: How to replay consuming messages from kafka using spark streaming? Hi, My Spark Streaming Job is doing like kafka etl to HDFS. For instance, every 10 min. my streaming job is retrieving messages from kafka, and save them as avro files onto hdfs. My question is, if worker fails to write avro to hdfs, sometimes, I want to replay consuming messages from the last succeeded kafka offset again. I think, Spark Streaming Kafka Receiver is written using Kafka High Level Consumer API, not Simple Consumer API. Any idea how to replay kafka consuming in spark streaming? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Better way of measuring custom application metrics
I started to know your requirement, maybe there’s some limitations in current MetricsSystem, I think we can improve it either. Thanks Jerry From: Enno Shioji [mailto:eshi...@gmail.com] Sent: Sunday, January 4, 2015 5:46 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Better way of measuring custom application metrics Hi Jerry, thanks for your answer. I had looked at MetricsSystem, but I couldn't see how I could use it in my use case, which is: stream .map { i = Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i) i * 2 } From what I can see, a Source accepts an object and describes how to poll it for metrics. Presumably that's why Sources have only Gauges and never Meters, for example. In my case, I don't have a state that I want Spark's MetricSystem to poll. If I could get a reference to an internal metricRegistry instance AND an task identifier in my functions, I could achieve the same thing while using Spark's metric configuration, but I couldn't find a way to do this either... [https://mailfoogae.appspot.com/t?sender=aZXNoaW9qaUBnbWFpbC5jb20%3Dtype=zerocontentguid=d015425e-0a3a-4ca7-8ddb-3b1862e8884d]ᐧ On Sun, Jan 4, 2015 at 2:46 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, I think there’s a StreamingSource in Spark Streaming which exposes the Spark Streaming running status to the metrics sink, you can connect it with Graphite sink to expose metrics to Graphite. I’m not sure is this what you want. Besides you can customize the Source and Sink of the MetricsSystem to build your own and configure it in metrics.properties with class name to let it loaded by metrics system, for the details you can refer to http://spark.apache.org/docs/latest/monitoring.html or source code. Thanks Jerry From: Enno Shioji [mailto:eshi...@gmail.commailto:eshi...@gmail.com] Sent: Sunday, January 4, 2015 7:47 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Better way of measuring custom application metrics I have a hack to gather custom application metrics in a Streaming job, but I wanted to know if there is any better way of doing this. My hack consists of this singleton: object Metriker extends Serializable { @transient lazy val mr: MetricRegistry = { val metricRegistry = new MetricRegistry() val graphiteEndpoint = new InetSocketAddress(ec2-54-220-56-229.eu-west-1.compute.amazonaws.comhttp://ec2-54-220-56-229.eu-west-1.compute.amazonaws.com, 2003) GraphiteReporter .forRegistry(metricRegistry) .build(new Graphite(graphiteEndpoint)) .start(5, TimeUnit.SECONDS) metricRegistry } @transient lazy val processId = ManagementFactory.getRuntimeMXBean.getName @transient lazy val hostId = { try { InetAddress.getLocalHost.getHostName } catch { case e: UnknownHostException = localhost } } def metricName(name: String): String = { %s.%s.%s.format(name, hostId, processId) } } which I then use in my jobs like so: stream .map { i = Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i) i * 2 } Then I aggregate the metrics on Graphite. This works, but I was curious to know if anyone has a less hacky way. ᐧ
RE: Better way of measuring custom application metrics
Hi, I think there’s a StreamingSource in Spark Streaming which exposes the Spark Streaming running status to the metrics sink, you can connect it with Graphite sink to expose metrics to Graphite. I’m not sure is this what you want. Besides you can customize the Source and Sink of the MetricsSystem to build your own and configure it in metrics.properties with class name to let it loaded by metrics system, for the details you can refer to http://spark.apache.org/docs/latest/monitoring.html or source code. Thanks Jerry From: Enno Shioji [mailto:eshi...@gmail.com] Sent: Sunday, January 4, 2015 7:47 AM To: user@spark.apache.org Subject: Better way of measuring custom application metrics I have a hack to gather custom application metrics in a Streaming job, but I wanted to know if there is any better way of doing this. My hack consists of this singleton: object Metriker extends Serializable { @transient lazy val mr: MetricRegistry = { val metricRegistry = new MetricRegistry() val graphiteEndpoint = new InetSocketAddress(ec2-54-220-56-229.eu-west-1.compute.amazonaws.comhttp://ec2-54-220-56-229.eu-west-1.compute.amazonaws.com, 2003) GraphiteReporter .forRegistry(metricRegistry) .build(new Graphite(graphiteEndpoint)) .start(5, TimeUnit.SECONDS) metricRegistry } @transient lazy val processId = ManagementFactory.getRuntimeMXBean.getName @transient lazy val hostId = { try { InetAddress.getLocalHost.getHostName } catch { case e: UnknownHostException = localhost } } def metricName(name: String): String = { %s.%s.%s.format(name, hostId, processId) } } which I then use in my jobs like so: stream .map { i = Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i) i * 2 } Then I aggregate the metrics on Graphite. This works, but I was curious to know if anyone has a less hacky way. [https://mailfoogae.appspot.com/t?sender=aZXNoaW9qaUBnbWFpbC5jb20%3Dtype=zerocontentguid=29916861-9b4d-423b-8e45-c731deddd43b]ᐧ
RE: serialization issue with mapPartitions
Hi, Hadoop Configuration is only Writable, not Java Serializable. You can use SerializableWritable (in Spark) to wrap the Configuration to make it serializable, and use broadcast variable to broadcast this conf to all the node, then you can use it in mapPartitions, rather than serialize it within closure. You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage scenario like yours. Thanks Jerry. From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, December 26, 2014 9:38 AM To: ey-chih chow Cc: user Subject: Re: serialization issue with mapPartitions Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.commailto:eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias
Question on saveAsTextFile with overwrite option
Hi, We have such requirements to save RDD output to HDFS with saveAsTextFile like API, but need to overwrite the data if existed. I'm not sure if current Spark support such kind of operations, or I need to check this manually? There's a thread in mailing list discussed about this (http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html), I'm not sure this feature is enabled or not, or with some configurations? Appreciate your suggestions. Thanks a lot Jerry
RE: Question on saveAsTextFile with overwrite option
Thanks Patrick for your detailed explanation. BR Jerry -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Thursday, December 25, 2014 3:43 PM To: Cheng, Hao Cc: Shao, Saisai; user@spark.apache.org; d...@spark.apache.org Subject: Re: Question on saveAsTextFile with overwrite option So the behavior of overwriting existing directories IMO is something we don't want to encourage. The reason why the Hadoop client has these checks is that it's very easy for users to do unsafe things without them. For instance, a user could overwrite an RDD that had 100 partitions with an RDD that has 10 partitions... and if they read back the RDD they would get a corrupted RDD that has a combination of data from the old and new RDD. If users want to circumvent these safety checks, we need to make them explicitly disable them. Given this, I think a config option is as reasonable as any alternatives. This is already pretty easy IMO. - Patrick On Wed, Dec 24, 2014 at 11:28 PM, Cheng, Hao hao.ch...@intel.com wrote: I am wondering if we can provide more friendly API, other than configuration for this purpose. What do you think Patrick? Cheng Hao -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Thursday, December 25, 2014 3:22 PM To: Shao, Saisai Cc: user@spark.apache.org; d...@spark.apache.org Subject: Re: Question on saveAsTextFile with overwrite option Is it sufficient to set spark.hadoop.validateOutputSpecs to false? http://spark.apache.org/docs/latest/configuration.html - Patrick On Wed, Dec 24, 2014 at 10:52 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, We have such requirements to save RDD output to HDFS with saveAsTextFile like API, but need to overwrite the data if existed. I'm not sure if current Spark support such kind of operations, or I need to check this manually? There's a thread in mailing list discussed about this (http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-S p ark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html), I'm not sure this feature is enabled or not, or with some configurations? Appreciate your suggestions. Thanks a lot Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming Python APIs?
AFAIK, this will be a new feature in version 1.2, you can check out the master branch or 1.2 branch to take a try. Thanks Jerry From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Sent: Monday, December 15, 2014 10:53 AM To: user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong
RE: KafkaUtils explicit acks
Hi, It is not a trivial work to acknowledge the offsets when RDD is fully processed, I think from my understanding only modify the KafakUtils is not enough to meet your requirement, you need to add a metadata management stuff for each block/RDD, and track them both in executor-driver side, and many other things should also be taken care :). Thanks Jerry From: mukh@gmail.com [mailto:mukh@gmail.com] On Behalf Of Mukesh Jha Sent: Monday, December 15, 2014 1:31 PM To: Tathagata Das Cc: francois.garil...@typesafe.com; user@spark.apache.org Subject: Re: KafkaUtils explicit acks Thanks TD Francois for the explanation documentation. I'm curious if we have any performance benchmark with without WAL for spark-streaming-kafka. Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets only when the RRDs are fully processed and are getting evicted out of the Spark memory thus we can be cent percent sure that all the records are getting processed in the system. I was thinking if it's good to have the kafka offset information of each batch as part of RDDs metadata and commit the offsets once the RDDs lineage is complete. On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.commailto:francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There's been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I'm not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver's test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala - FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.commailto:me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.commailto:me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha -- Thanks Regards, Mukesh Jhamailto:me.mukesh@gmail.com
RE: Spark streaming for v1.1.1 - unable to start application
Hi, I don’t think it’s a problem of Spark Streaming, seeing for call stack, it’s the problem when BlockManager starting to initializing itself. Would you mind checking your configuration of Spark, hardware problem, deployment. Mostly I think it’s not the problem of Spark. Thanks Saisai From: Sourav Chandra [mailto:sourav.chan...@livestream.com] Sent: Friday, December 5, 2014 4:36 PM To: user@spark.apache.org Subject: Spark streaming for v1.1.1 - unable to start application Hi, I am getting the below error and due to this there is no completed stages- all the waiting 14/12/05 03:31:59 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:213) at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:203) at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:47) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:177) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:147) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230) at org.apache.spark.executor.Executor.init(Executor.scala:78) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Could you please let me know the reason and fix for this? Spark version is 1.1.1 -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.commailto:sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.comhttp://www.livestream.com/
RE: Spark Streaming empty RDD issue
Hi, According to my knowledge of current Spark Streaming Kafka connector, I think there's no chance for APP user to detect such kind of failure, this will either be done by Kafka consumer with ZK coordinator, either by ReceiverTracker in Spark Streaming, so I think you don't need to take care of this issue from user's perspective. If there's no new message coming to consumer, the consumer will wait. Thanks Jerry -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: Thursday, December 4, 2014 2:47 PM To: u...@spark.incubator.apache.org Subject: Spark Streaming empty RDD issue Hi Experts I am using Spark Streaming to integrate Kafka for real time data processing. I am facing some issues related to Spark Streaming So I want to know how can we detect 1) Our connection has been lost 2) Our receiver is down 3) Spark Streaming has no new messages to consume. how can we deal these issues? I will be glad to hear from you and will be thankful to you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-empty-RDD-issue-tp20329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Low Level Kafka Consumer for Spark
Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming cannot receive any message from Kafka
Hi Bill, Would you mind describing what you found a little more specifically, I’m not sure there’s the a parameter in KafkaUtils.createStream you can specify the spark parallelism, also what is the exception stacks. Thanks Jerry From: Bill Jay [mailto:bill.jaypeter...@gmail.com] Sent: Tuesday, November 18, 2014 2:47 AM To: Helena Edelson Cc: Jay Vyas; u...@spark.incubator.apache.org; Tobias Pfeiffer; Shao, Saisai Subject: Re: Spark streaming cannot receive any message from Kafka Hi all, I find the reason of this issue. It seems in the new version, if I do not specify spark.default.parallelism in KafkaUtils.createstream, there will be an exception since the kakfa stream creation stage. In the previous versions, it seems Spark will use the default value. Thanks! Bill On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson helena.edel...@datastax.commailto:helena.edel...@datastax.com wrote: I encounter no issues with streaming from kafka to spark in 1.1.0. Do you perhaps have a version conflict? Helena On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.commailto:jayunit100.apa...@gmail.com wrote: Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Thursday, November 13, 2014 8:45 AM To: Bill Jay Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
RE: Spark streaming cannot receive any message from Kafka
Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Thursday, November 13, 2014 8:45 AM To: Bill Jay Cc: u...@spark.incubator.apache.org Subject: Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
RE: Kafka Consumer in Spark Streaming
Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will help to define the problem. Thanks Jerry From: Something Something [mailto:mailinglist...@gmail.com] Sent: Wednesday, November 05, 2014 1:57 PM To: user@spark.apache.org Subject: Kafka Consumer in Spark Streaming I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } );
RE: MEMORY_ONLY_SER question
From my understanding, the Spark code use Kryo as a streaming manner for RDD partitions, the deserialization comes with iteration to move forward. But the internal thing of Kryo to deserialize all the object once or incrementally is actually a behavior of Kryo, I guess Kyro will not deserialize the objects once for all. Thanks Jerry From: Mohit Jaggi [mailto:mohitja...@gmail.com] Sent: Wednesday, November 05, 2014 2:01 PM To: Tathagata Das Cc: user@spark.apache.org Subject: Re: MEMORY_ONLY_SER question I used the word streaming but I did not mean to refer to spark streaming. I meant if a partition containing 10 objects was kryo-serialized into a single buffer, then in a mapPartitions() call, as I call iter.next() 10 times to access these objects one at a time, does the deserialization happen a) once to get all 10 objects, b) 10 times incrementally to get an object at a time, or c) 10 times to get 10 objects and discard the wrong 9 objects [ i doubt this would a design anyone would have adopted ] I think your answer is option (a) and you refered to Spark streaming to indicate that there is no difference in its behavior from spark core...right? If it is indeed option (a), I am happy with it and don't need to customize. If it is (b), I would like to have (a) instead. I am also wondering if kryo is good at compression of strings and numbers. Often I have the data type as Double but it could be encoded in much fewer bits. On Tue, Nov 4, 2014 at 1:02 PM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: It it deserialized in a streaming manner as the iterator moves over the partition. This is a functionality of core Spark, and Spark Streaming just uses it as is. What do you want to customize it to? On Tue, Nov 4, 2014 at 9:22 AM, Mohit Jaggi mohitja...@gmail.commailto:mohitja...@gmail.com wrote: Folks, If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed for a transformation/action later, is the whole partition of the RDD deserialized into Java objects first before my transform/action code works on it? Or is it deserialized in a streaming manner as the iterator moves over the partition? Is this behavior customizable? I generally use the Kryo serializer. Mohit.
RE: Kafka Consumer in Spark Streaming
If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry From: Something Something [mailto:mailinglist...@gmail.com] Sent: Wednesday, November 05, 2014 2:28 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Kafka Consumer in Spark Streaming The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will help to define the problem. Thanks Jerry From: Something Something [mailto:mailinglist...@gmail.commailto:mailinglist...@gmail.com] Sent: Wednesday, November 05, 2014 1:57 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Kafka Consumer in Spark Streaming I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } );
RE: FileNotFoundException in appcache shuffle files
Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one. Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in * A.persist() * A.count() // succeeds * screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v * I don't know why each task reports 8 TB of Input; that metric seems like it is always ludicrously high and I don't pay attention to it typically. * Each task shuffle-writes 3.5GB, for a total of 4.9TB * Does that mean that 4.9TB is the uncompressed size of the file that A was read from? * 4.9TB is pretty close to the total amount of memory I've configured the job to use: (50GB/executor) * (100 executors) ~= 5TB. * Is that a coincidence, or are my executors shuffle-writing an amount equal to all of their memory for some reason? * val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x = x).persist() * my expectation is that ~all elements pass the filter step, so B should ~equal to A, just to give a sense of the expected memory blowup. * B.count() * this fails while executing .groupBy(...) above I've found a few discussions of issues whose manifestations look *like* this, but nothing that is obviously the same issue. The
RE: RDD to DStream
Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 1:42 PM To: Tathagata Das Cc: Aniket Bhatnagar; user@spark.apache.org Subject: Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com wrote: The use case for converting RDD into DStream is that I want to simulate a stream from an already persisted data for testing analytics. It is trivial to create a RDD from any persisted data but not so much for DStream. Therefore, my idea to create DStream from RDD. For example, lets say you are trying to implement analytics on time series data using Lambda architecture. This means you would have to implement the same analytics on streaming data (in streaming mode) as well as persisted data (in batch mode). The workflow for implementing the anlytics would be to first implement it in batch mode using RDD operations and then simulate stream to test the analytics in stream mode. The simulated stream should produce the elements at a specified rate. So the solution maybe to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having the size of elements that need to be streamed per time unit and then finally stream each RDD using the compute function. The problem with using QueueInputDStream is that it will stream data as per the batch duration specified in the streaming context and one cannot specify a custom slide duration. Moreover, the class QueueInputDStream is private to streaming package, so I can't really use it/extend it from an external package. Also, I could not find a good solution split a RDD into equal sized smaller RDDs that can be fed into an extended version of QueueInputDStream. Finally, here is what I came up with: class RDDExtension[T: ClassTag](rdd: RDD[T]) { def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { private val iterator = rdd.toLocalIterator // WARNING: each partition much fit in RAM of local machine. private val grouped = iterator.grouped(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { if (grouped.hasNext) { Some(rdd.sparkContext.parallelize(grouped.next())) } else { None } } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } } This aims to stream chunkSize elements every slideDurationMilli milliseconds (defaults to batch size in streaming context). It's still not perfect (for example, the streaming is not precise) but given that this will only be used for testing purposes, I don't look for ways to further optimize it. Thanks, Aniket On 2 August 2014 04:07, Mayur Rustagi mayur.rust...@gmail.commailto:mayur.rust...@gmail.com wrote: Nice question :) Ideally you should use a queuestream interface to push
RE: RDD to DStream
I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 2:39 PM To: Shao, Saisai Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com) Subject: Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 1:42 PM To: Tathagata Das Cc: Aniket Bhatnagar; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com wrote: The use case for converting RDD into DStream is that I want to simulate a stream from an already persisted data for testing analytics. It is trivial to create a RDD from any persisted data but not so much for DStream. Therefore, my idea to create DStream from RDD. For example, lets say you are trying to implement analytics on time series data using Lambda architecture. This means you would have to implement the same analytics on streaming data (in streaming mode) as well as persisted data (in batch mode). The workflow for implementing the anlytics would be to first implement it in batch mode using RDD operations and then simulate stream to test the analytics in stream mode. The simulated stream should produce the elements at a specified rate. So the solution maybe to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having the size of elements that need to be streamed per time unit and then finally stream each RDD using the compute function
RE: RDD to DStream
I think what you want is to make each bucket as a new RDD as what you mentioned in Pig syntax. gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket From my understanding, currently in Spark there’s no such kind of API to achieve this, maybe you have to create a customized RDD by yourself. For the code why cannot executed, .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to execute in remote side, which obviously do not has SparkContext, I think Spark cannot support nested RDD in closure. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 3:30 PM To: Shao, Saisai Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com) Subject: Re: RDD to DStream Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 2:39 PM To: Shao, Saisai Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das (t...@databricks.commailto:t...@databricks.com) Subject: Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 1:42 PM To: Tathagata Das Cc: Aniket Bhatnagar; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do
RE: RDD to DStream
Yes, I understand what you want, but maybe hard to achieve without collecting back to driver node. Besides, can we just think of another way to do it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 4:07 PM To: Shao, Saisai Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com) Subject: Re: RDD to DStream Yeah, you're absolutely right Saisai. My point is we should allow this kind of logic in RDD, let's say transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])]. Make sense? Jianshi On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think what you want is to make each bucket as a new RDD as what you mentioned in Pig syntax. gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket From my understanding, currently in Spark there’s no such kind of API to achieve this, maybe you have to create a customized RDD by yourself. For the code why cannot executed, .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to execute in remote side, which obviously do not has SparkContext, I think Spark cannot support nested RDD in closure. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 3:30 PM To: Shao, Saisai Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das (t...@databricks.commailto:t...@databricks.com) Subject: Re: RDD to DStream Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Monday, October 27, 2014 2:39 PM To: Shao, Saisai Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das (t...@databricks.commailto:t...@databricks.com) Subject: Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also
RE: Sort-based shuffle did not work as expected
Hi, Probably the problem you met is related to this JIRA ticket (https://issues.apache.org/jira/browse/SPARK-3948). It's potentially a Kernel 2.6.32 bug which will make sort-based shuffle failed. I'm not sure your problem is the same as this one, would you mind checking your kernel version? Thanks Jerry From: su...@certusnet.com.cn [mailto:su...@certusnet.com.cn] Sent: Monday, October 27, 2014 5:41 PM To: user Subject: Sort-based shuffle did not work as expected Hi, all We would expect to utilize sort-based shuffle in our spark application but had encounted unhandled problems. It seems that data file and index file are not in consistence state and we got wrong result sets when trying to use spark to bulk load data into hbase. There are many fetch failurs like the following: FetchFailed(BlockManagerId(0, work5.msa.certusnet, 44544, 0), shuffleId=0, mapId=42, reduceId=3) Refering to the executor log, we catch the following exception: 14/10/27 11:20:36 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(work4.msa.certusnet,53616) java.io.IOException: sendMessageReliably failed with ACK that signalled a remote error at org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:869) at org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:861) at org.apache.spark.network.ConnectionManager$MessageStatus.markDone(ConnectionManager.scala:66) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:660) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/10/27 11:00:50 ERROR BlockManagerWorker: Exception handling buffer message java.io.IOException: Channel not open for writing - cannot extend file to required size at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:851) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:379) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48) Any suggestion? Thanks Sun CertusNet
RE: Spark Hive Snappy Error
Thanks a lot, I will try to reproduce this in my local settings and dig into the details, thanks for your information. BR Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Wednesday, October 22, 2014 8:35 PM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Yes, I can always reproduce the issue: about you workload, Spark configuration, JDK version and OS version? I ran SparkPI 1000 java -version java version 1.7.0_67 Java(TM) SE Runtime Environment (build 1.7.0_67-b01) Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) cat /etc/centos-release CentOS release 6.5 (Final) My Spark’s hive-site.xml with following: property namehive.exec.compress.output/name valuetrue/value /property property namemapred.output.compression.codec/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property property namemapred.output.compression.type/name valueBLOCK/value /property e.g. MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000 2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,036 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at akka.dispatch.Mailbox.run
RE: Spark Hive Snappy Error
Seems you just add snappy library into your classpath: export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar But for spark itself, it depends on snappy-0.2.jar. Is there any possibility that this problem caused by different version of snappy? Thanks Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Thursday, October 23, 2014 11:32 AM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Please find the attached file. my spark-default.xml # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # # Example: # spark.masterspark://master:7077 # spark.eventLog.enabled true # spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializerorg.apache.spark.serializer.KryoSerializer # spark.executor.memory 2048m spark.shuffle.spill.compressfalse spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec my spark-env.sh #!/usr/bin/env bash export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop} export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/ export SPARK_LOG_DIR=/edh/hadoop_logs/spark export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar: export SPARK_WORKER_MEMORY=2g export HADOOP_HEAPSIZE=2000 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC ll $HADOOP_HOME/lib/native/Linux-amd64-64 -rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar -rw-rw-r--. 1 tester tester 1062640 Aug 27 12:19 libhadoop.a -rw-rw-r--. 1 tester tester 1487564 Aug 27 11:14 libhadooppipes.a lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so - libhadoopsnappy.so.0.0.1 lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so.0 - libhadoopsnappy.so.0.0.1 -rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1 -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so.1.0.0 -rw-rw-r--. 1 tester tester 582472 Aug 27 11:14 libhadooputils.a -rw-rw-r--. 1 tester tester 298626 Aug 27 11:14 libhdfs.a -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so.0.0.0 lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so - /usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so - libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so.8 - libprotobuf-lite.so.8.0.0 -rwxr-xr-x. 1 tester tester 964689 Aug 27 07:08 libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so - libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so.8 - libprotobuf.so.8.0.0 -rwxr-xr-x. 1 tester tester 8300050 Aug 27 07:08 libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so - libprotoc.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so.8 - libprotoc.so.8.0.0 -rwxr-xr-x. 1 tester tester 9935810 Aug 27 07:08 libprotoc.so.8.0.0 -rw-r--r--. 1 tester tester 233554 Aug 27 15:19 libsnappy.a lrwxrwxrwx. 1 tester tester 23 Aug 27 11:32 libsnappy.so - /usr/lib64/libsnappy.so lrwxrwxrwx. 1 tester tester 23 Aug 27 11:33 libsnappy.so.1 - /usr/lib64/libsnappy.so -rwxr-xr-x. 1 tester tester 147726 Aug 27 07:08 libsnappy.so.1.2.0 drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig Regards Arthur On 23 Oct, 2014, at 10:57 am, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Arthur, I think your problem might be different from what SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems your problem is more likely to be a library link problem, would you mind checking your Spark runtime to see if the snappy.so is loaded or not? (through lsof -p). I guess your problem is more likely to be a library not found problem. Thanks Jerry
RE: Shuffle files
Hi Song, For what I know in sort-based shuffle. Normally parallel opened file numbers for sort-based shuffle is much smaller than hash-based shuffle. In hash based shuffle, parallel opened file numbers is C * R (where C is core number used and R is the reducer number), as you can see the file numbers are related to reducer number, no matter how large the shuffle size is. While in sort-based shuffle, final map output file is only 1, to achieve this we need to do by-partition sorting, this will generate some intermediate spilling files, but spilled file numbers are related to shuffle size and memory size for shuffle, no relation to reducer number. So If you met “too many open files” in sort-based shuffle, I guess that you have so many spilled files while doing shuffle write, one possible way to alleviate this is to increase the shuffle memory usage, also change the ulimit is a possible way. I guess in Yarn you have to do system configuration manually, Spark cannot set ulimit automatically for you, I don’t think it’s an issue Spark should take care. Thanks Jerry From: Chen Song [mailto:chen.song...@gmail.com] Sent: Tuesday, October 21, 2014 9:10 AM To: Andrew Ash Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org Subject: Re: Shuffle files My observation is opposite. When my job runs under default spark.shuffle.manager, I don't see this exception. However, when it runs with SORT based, I start seeing this error? How would that be possible? I am running my job in YARN, and I noticed that the YARN process limits (cat /proc/$PID/limits) are not consistent with system wide limits (shown by limit -a), I don't know how that happened. Is there a way to let Spark driver to propagate this setting (limit -n number) to spark executors before startup? On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash and...@andrewash.commailto:and...@andrewash.com wrote: You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.commailto:sunny.k...@gmail.com wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.commailto:todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.commailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- Chen Song
RE: Spark Hive Snappy Error
Hi Arthur, I think this is a known issue in Spark, you can check (https://issues.apache.org/jira/browse/SPARK-3958). I’m curious about it, can you always reproduce this issue, Is this issue related to some specific data sets, would you mind giving me some information about you workload, Spark configuration, JDK version and OS version? Thanks Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Friday, October 17, 2014 7:13 AM To: user Cc: arthur.hk.c...@gmail.com Subject: Spark Hive Snappy Error Hi, When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error, val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(“select count(1) from q8_national_market_share sqlContext.sql(select count(1) from q8_national_market_share).collect().foreach(println) java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:68) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:68) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at
RE: Spark Streaming KafkaUtils Issue
Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the
RE: Spark Streaming KafkaUtils Issue
Hi abe, You can see the details in KafkaInputDStream.scala, here is the snippet // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains(auto.offset.reset)) { tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it will clean ZK metadata immediately, so you will always read data from beginning (set to “smallest”) and end (set to “largest”) immediately, because the ZK metadata is deleted beforehand. If you do not set this parameter, this code path will not be triggered, so data will be read from the last commit point. And if last commit point is not yet available, Kafka will move the offset to the end of partition (Kafka is set “auto.commit.offset” to “largest” by default). If you want to keep the same semantics as Kafka, you need to remove the above code path manually and recompile the Spark. Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 8:49 AM To: Shao, Saisai Cc: user@spark.apache.org; Sean McNamara Subject: Re: Spark Streaming KafkaUtils Issue Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2
RE: Error reading from Kafka
Hi, I think you have to change the code like this to specify the type info, like this: val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) You can take a try, actually Kafka unit test also use this API and worked fine. Besides, the fixed issue you mentioned below will only be occurred in Java code calling related API. Thanks Jerry From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com] Sent: Wednesday, October 08, 2014 10:04 PM To: user@spark.apache.org Subject: Error reading from Kafka Hi, I'm trying to read from Kafka. I was able to do it correctly with this method. def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] But now I have to add some params to kafka consumer so I've changed to other createStream method but I'm getting an error: 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2849) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) This is my code. It seems that createStream returns ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by reflection from Nothing object and don't find the method. val topics = config.getString(nessus.kafka.topics) val numThreads = config.getInt(nessus.kafka.numThreads) val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val kafkaParams = Map( zookeeper.connect - localhost:2181, group.idhttp://group.id/ - my-grp) val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) I found that issue https://issues.apache.org/jira/browse/SPARK-2103 https://issues.apache.org/jira/browse/SPARK-2103 but it was solved and I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. Any thoughts? -- [http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]http://www.stratio.com/ Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // @stratiobdhttps://twitter.com/StratioBD
RE: problem with data locality api
Hi First conf is used for Hadoop to determine the locality distribution of HDFS file. Second conf is used for Spark, though with the same name, actually they are two different classes. Thanks Jerry From: qinwei [mailto:wei@dewmobile.net] Sent: Sunday, September 28, 2014 2:05 PM To: user Subject: problem with data locality api Hi, everyone I come across with a problem about data locality, i found these example code in 《Spark-on-YARN-A-Deep-Dive-Sandy-Ryza.pdf》 val locData = InputFormatInfo.computePreferredLocations(Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) but i found the two confs above are of different types, conf in the first line if of type org.apache.hadoop.conf.Configuration, and conf in the second line is of type SparkConf, can anyone explain that to me or give me some example code? qinwei
RE: sortByKey trouble
Hi, SortByKey is only for RDD[(K, V)], each tuple can only has two members, Spark will sort with first member, if you want to use sortByKey, you have to change your RDD[(String, String, String, String)] into RDD[(String, (String, String, String))]. Thanks Jerry -Original Message- From: david [mailto:david...@free.fr] Sent: Wednesday, September 24, 2014 4:30 PM To: u...@spark.incubator.apache.org Subject: sortByKey trouble Hi, Does anybody know how to use sortbykey in scala on a RDD like : val rddToSave = file.map(l = l.split(\\|)).map(r = (r(34)+-+r(3), r(4), r(10), r(12))) besauce, i received ann error sortByKey is not a member of ord.apache.spark.rdd.RDD[(String,String,String,String)]. What i try do do is sort on the first element. Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark.local.dir and spark.worker.dir not used
Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry From: Priya Ch [mailto:learnings.chitt...@gmail.com] Sent: Tuesday, September 23, 2014 6:31 PM To: user@spark.apache.org; d...@spark.apache.org Subject: spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch
RE: spark.local.dir and spark.worker.dir not used
This folder will be created when you start your Spark application under your spark.local.dir, with the name “spark-local-xxx” as prefix. It’s quite strange you don’t see this folder, maybe you miss something. Besides if Spark cannot create this folder on start, persist rdd to disk will be failed. Also I think there’s no way to persist RDD to HDFS, even in YARN, only RDD’s checkpoint can save data on HDFS. Thanks Jerry From: Chitturi Padma [mailto:learnings.chitt...@gmail.com] Sent: Tuesday, September 23, 2014 8:33 PM To: u...@spark.incubator.apache.org Subject: Re: spark.local.dir and spark.worker.dir not used I couldnt even see the spark-id folder in the default /tmp directory of local.dir. On Tue, Sep 23, 2014 at 6:01 PM, Priya Ch [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=0 wrote: Is it possible to view the persisted RDD blocks ? If I use YARN, RDD blocks would be persisted to hdfs then will i be able to read the hdfs blocks as i could do in hadoop ? On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=1 wrote: Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry From: Priya Ch [mailto:[hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=0] Sent: Tuesday, September 23, 2014 6:31 PM To: [hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=1; [hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=2 Subject: spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html To start a new topic under Apache Spark User List, email [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=2 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml View this message in context: Re: spark.local.dir and spark.worker.dir not usedhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14887.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
RE: how long does it take executing ./sbt/sbt assembly
If you have enough memory, the speed will be faster, within one minutes, since most of the files are cached. Also you can build your Spark project on a mounted ramfs in Linux, this will also speed up the process. Thanks Jerry -Original Message- From: Zhan Zhang [mailto:zzh...@hortonworks.com] Sent: Wednesday, September 24, 2014 1:11 PM To: christy Cc: u...@spark.incubator.apache.org Subject: Re: how long does it take executing ./sbt/sbt assembly Definitely something wrong. For me, 10 to 30 minutes. Thanks. Zhan Zhang On Sep 23, 2014, at 10:02 PM, christy 760948...@qq.com wrote: This process began yesterday and it has already run for more than 20 hours. Is it normal? Any one has the same problem? No error throw out yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-long-does-it-t ake-executing-sbt-sbt-assembly-tp14975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Issues with partitionBy: FetchFailed
Hi, I’ve also met this problem before, I think you can try to set “spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, default is 60 seconds. Sometimes because of GC pause or some other reasons, acknowledged message will be timeout, which will lead to this exception, you can try setting a large value of this configuration. Thanks Jerry From: Julien Carme [mailto:julien.ca...@gmail.com] Sent: Sunday, September 21, 2014 7:43 PM To: user@spark.apache.org Subject: Issues with partitionBy: FetchFailed Hello, I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine. So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine: myRDD = sc.textFile(...).map { line = (extractKey(line),line) } myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100)) myRepartitionedRDD.saveAsTextFile(...) It runs quite some time, until I get some errors and it retries. Errors are: FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5) Logs are not much more infomrative. I get: Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec I get similar errors with all my workers. Do you have some kind of explanation for this behaviour? What could be wrong? Thanks,
RE: Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..
Hi Rafeeq, I think this situation always occurs when your Spark Streaming application is running in an abnormal situation. Would you mind checking your job processing time in WebUI or log, is the total latency of job processing + job scheduling time larger than batch duration? If your Spark Streaming application is in this situation, you will meet this exception. Normally the reason of this happening is that Spark Streaming job processed one by one by default, if one job is blocked for a long time, the next job has to wait until the previous one is finished, but input block will be deleted after timeout, so when this job is started, it cannot find the right block and will throw this exception. Also you will meet this exception in other abnormal situation. Anyway this exception means your application is abnormal, you should pay attention to your job execution time. You can check the spark streaming dochttp://spark.apache.org/docs/latest/streaming-programming-guide.html “Monitoring Applications” section to see the details. Thanks Jerry From: Rafeeq S [mailto:rafeeq.ec...@gmail.com] Sent: Thursday, September 18, 2014 2:43 PM To: u...@spark.incubator.apache.org Subject: Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed.. Hi, I am testing kafka-spark streaming application which throws below error after few seconds and below configuration is used for spark streaming test environment. kafka version- 0.8.1 spark version- 1.0.1 SPARK_MASTER_MEMORY=1G SPARK_DRIVER_MEMORY=1G SPARK_WORKER_INSTANCES=1 SPARK_EXECUTOR_INSTANCES=1 SPARK_WORKER_MEMORY=1G SPARK_EXECUTOR_MEMORY=1G SPARK_WORKER_CORES=2 SPARK_EXECUTOR_CORES=1 ERROR: 14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Please suggest your answer Regards, Rafeeq S (“What you do is what matters, not what you think or say or plan.” )
RE: JMXSink for YARN deployment
Hi, I’m guessing the problem is that driver or executor cannot get the metrics.properties configuration file in the yarn container, so metrics system cannot load the right sinks. Thanks Jerry From: Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.com] Sent: Thursday, September 11, 2014 7:30 PM To: user@spark.apache.org Subject: JMXSink for YARN deployment Hello, we are in Sematext (https://apps.sematext.com/) are writing Monitoring tool for Spark and we came across one question: How to enable JMX metrics for YARN deployment? We put *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink to file $SPARK_HOME/conf/metrics.properties but it doesn't work. Everything works in Standalone mode, but not in YARN mode. Can somebody help? Thx! PS: I've found also https://stackoverflow.com/questions/23529404/spark-on-yarn-how-to-send-metrics-to-graphite-sink/25786112 without answer.
RE: JMXSink for YARN deployment
I think you can try to use ” spark.metrics.conf” to manually specify the path of metrics.properties, but the prerequisite is that each container should find this file in their local FS because this file is loaded locally. Besides I think this might be a kind of workaround, a better solution is to fix this by some other solutions. Thanks Jerry From: Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.com] Sent: Thursday, September 11, 2014 10:08 PM Cc: user@spark.apache.org Subject: Re: JMXSink for YARN deployment Hi Shao, thx for explanation, any ideas how to fix it? Where should I put metrics.properties file? On Thu, Sep 11, 2014 at 4:18 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi, I’m guessing the problem is that driver or executor cannot get the metrics.properties configuration file in the yarn container, so metrics system cannot load the right sinks. Thanks Jerry From: Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.commailto:vladimir.tretya...@sematext.com] Sent: Thursday, September 11, 2014 7:30 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: JMXSink for YARN deployment Hello, we are in Sematext (https://apps.sematext.com/) are writing Monitoring tool for Spark and we came across one question: How to enable JMX metrics for YARN deployment? We put *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink to file $SPARK_HOME/conf/metrics.properties but it doesn't work. Everything works in Standalone mode, but not in YARN mode. Can somebody help? Thx! PS: I've found also https://stackoverflow.com/questions/23529404/spark-on-yarn-how-to-send-metrics-to-graphite-sink/25786112 without answer.
RE: Spark streaming: size of DStream
Hi, Is there any specific scenario which needs to know the RDD numbers in the DStream? According to my knowledge DStream will generate one RDD in each right batchDuration, some old rdd will be remembered for windowing-like function, and will be removed when useless. The hashmap generatedRDDs in DStream.scala contains the rdd as you wanted, though you cannot call it from app. Besides the count() API returns the records number of this DStream's each RDD, not the number of RDD, the number of RDD should always be 1 as I understand. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 2:42 PM To: u...@spark.incubator.apache.org Subject: Spark streaming: size of DStream I want to implement the following logic: val stream = getFlumeStream() // a DStream if(size_of_stream 0) // if the DStream contains some RDD stream.someTransfromation stream.count() can figure out the number of RDD in a DStream, but it return a DStream[Long] and can't compare with a number. does anyone know how to get the number of RDD in a DStream? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
Hi, I think all the received stream will generate a RDD in each batch duration even there is no data feed in (an empty RDD will be generated). So you cannot use number of RDD to judge whether there is any data received. One way is to do this in DStream/foreachRDD(), like a.foreachRDD { r = if (r.count() == 0) { do something } else { do some other things. } } You can try it. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 3:42 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size of DStream Hi Jerry, Thanks for your reply. I use spark streaming to receive the flume stream, then I need to do a judgement, in each batchDuration, if the received stream has data, then I should do something, if no data, do the other thing. Then I thought the count() can give me the measure, but it returns a DStream, not a number. so is there a way to achieve this case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark streaming: size of DStream
I think you should clarify some things in Spark Streaming: 1. closure in map is running in the remote side, so modify count var will only take effect in remote side. You will always get -1 in driver side. 2. some codes in closure in foreachRDD is lazily executed in each batch duration, while the if (...) code outside the closure is executed once immediately and will never executed again, so your code logic is wrong as expected. 3. I don't think you need to judge whether there is data feed in to do some transformations, you can directly transform on DStream even there is no data injected in this batch duration, it's only an empty transformation, no more specific overhead. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 4:20 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size of DStream i'm sorry I have some error in my code, update here: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o={ count = 0L // reset the count variable in each batch o }) countDStream.foreachRDD(rdd= count += rdd.count()) if (count 0) { currentBatch.map(...).someOtherTransformation } two problems: 1. the variable count just go on accumulate and no reset in each batch 2. if(count 0) only evaluate in the beginning of running the program, so the next statement will never run Can you all give me some suggestion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Setting Kafka parameters in Spark Streaming
Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 12:49 AM To: user@spark.apache.org Subject: Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth
RE: Setting Kafka parameters in Spark Streaming
As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 in another mail, there are some notes you should take care: 1. Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is compiled with Scala 2.10, there is no binary compatible between these two Scala versions. So you have to modify Kafka code as previously Spark did to fix Scala problem. 2. High Level Consumer API changes between Kafka 0.7 and 0.8, so you have to modify KafkaInputDStream in Spark Streaming. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 1:19 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Setting Kafka parameters in Spark Streaming Thanks, Shao, for providing the necessary information. Hemanth On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.commailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 12:49 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth
RE: Trying to run SparkSQL over Spark Streaming
Hi, StreamSQL (https://github.com/thunderain-project/StreamSQL) is a POC project based on Spark to combine the power of Catalyst and Spark Streaming, to offer people the ability to manipulate SQL on top of DStream as you wanted, this keep the same semantics with SparkSQL as offer a SchemaDStream on top of DStream. You don't need to do tricky thing like extracting rdd to register as a table. Besides other parts are the same as Spark. Thanks Jerry -Original Message- From: praveshjain1991 [mailto:praveshjain1...@gmail.com] Sent: Thursday, August 21, 2014 2:25 PM To: u...@spark.incubator.apache.org Subject: Re: Trying to run SparkSQL over Spark Streaming Oh right. Got it. Thanks Also found this link on that discussion: https://github.com/thunderain-project/StreamSQL Does this provide more features than Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-run-SparkSQL-over-Spark-Streaming-tp12530p12538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: NullPointerException from '.count.foreachRDD'
Hi, I don't think there's a NPE issue when using DStream/count() even there is no data feed into Spark Streaming. I tested using Kafka in my local settings, both are OK with and without data consumed. Actually you can see the details in ReceiverInputDStream, even there is no data in this batch duration, it will generate an empty BlockRDD, so map() and transformation() in count() operator will never meet NPE. I think the problem may lies on your customized InputDStream, you should make sure to generate an empty RDD even when there is no data feed in. Thanks Jerry -Original Message- From: anoldbrain [mailto:anoldbr...@gmail.com] Sent: Wednesday, August 20, 2014 4:13 PM To: u...@spark.incubator.apache.org Subject: Re: NullPointerException from '.count.foreachRDD' Looking at the source codes of DStream.scala /** * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ def count(): DStream[Long] = { this.map(_ = (null, 1L)) .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) .reduceByKey(_ + _) .map(_._2) } transform is the line throwing the NullPointerException. Can anyone give some hints as what would cause _ to be null (it is indeed null)? This only happens when there is no data to process. When there's data, no NullPointerException is thrown, and all the processing/counting proceeds correctly. I am using my custom InputDStream. Could it be that this is the source of the problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: OutOfMemory Error
Hi Meethu, The spark.executor.memory is the Java heap size of forked executor process. Increasing the spark.executor.memory can actually increase the runtime heap size of executor process. For the details of Spark configurations, you can check: http://spark.apache.org/docs/latest/configuration.html Thanks Jerry From: MEETHU MATHEW [mailto:meethu2...@yahoo.co.in] Sent: Wednesday, August 20, 2014 4:48 PM To: Akhil Das; Ghousia Cc: user@spark.apache.org Subject: Re: OutOfMemory Error Hi , How to increase the heap size? What is the difference between spark executor memory and heap size? Thanks Regards, Meethu M On Monday, 18 August 2014 12:35 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: I believe spark.shuffle.memoryFraction is the one you are looking for. spark.shuffle.memoryFraction : Fraction of Java heap to use for aggregation and cogroups during shuffles, if spark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. You can give it a try. Thanks Best Regards On Mon, Aug 18, 2014 at 12:21 PM, Ghousia ghousia.ath...@gmail.commailto:ghousia.ath...@gmail.com wrote: Thanks for the answer Akhil. We are right now getting rid of this issue by increasing the number of partitions. And we are persisting RDDs to DISK_ONLY. But the issue is with heavy computations within an RDD. It would be better if we have the option of spilling the intermediate transformation results to local disk (only in case if memory consumption is high) . Do we have any such option available with Spark? If increasing the partitions is the only the way, then one might end up with OutOfMemory Errors, when working with certain algorithms where intermediate result is huge. On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: Hi Ghousia, You can try the following: 1. Increase the heap sizehttps://spark.apache.org/docs/0.9.0/configuration.html 2. Increase the number of partitionshttp://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine 3. You could try persisting the RDD to use DISK_ONLYhttp://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Thanks Best Regards On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.commailto:ghousia.ath...@gmail.com wrote: Hi, I am trying to implement machine learning algorithms on Spark. I am working on a 3 node cluster, with each node having 5GB of memory. Whenever I am working with slightly more number of records, I end up with OutOfMemory Error. Problem is, even if number of records is slightly high, the intermediate result from a transformation is huge and this results in OutOfMemory Error. To overcome this, we are partitioning the data such that each partition has only a few records. Is there any better way to fix this issue. Some thing like spilling the intermediate data to local disk? Thanks, Ghousia. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
RE: Hi
Hi, Actually several java task threads running in a single executor, not processes, so each executor will only have one JVM runtime which shares with different task threads. Thanks Jerry From: rapelly kartheek [mailto:kartheek.m...@gmail.com] Sent: Wednesday, August 20, 2014 5:29 PM To: user@spark.apache.org Subject: Hi Hi I have this doubt: I understand that each java process runs on different JVM instances. Now, if I have a single executor on my machine and run several java processes, then there will be several JVM instances running. Now, process_local means, the data is located on the same JVM as the task that is launched. But, the memory associated with the entire executor is same. Then, how does this memory gets distributed across the JVMs??. I mean, how this memory gets associated with multiple JVMs?? Thank you!!! -karthik
RE: Data loss - Spark streaming and network receiver
I think Currently Spark Streaming lack a data acknowledging mechanism when data is stored and replicated in BlockManager, so potentially data will be lost even pulled into Kafka, say if data is stored just in BlockGenerator not BM, while in the meantime Kafka itself commit the consumer offset, also at this point node is failed, from Kafka’s point this part of data is feed into Spark Streaming but actually this data is not yet processed, so potentially this part of data will never be processed again, unless you read the whole partition again. To solve this potential data loss problem, Spark Streaming needs to offer a data acknowledging mechanism, so custom Receiver can use this acknowledgement to do checkpoint or recovery, like Storm. Besides, driver failure is another story need to be carefully considered. So currently it is hard to make sure no data loss in Spark Streaming, still need to improve at some points ☺. Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Tuesday, August 19, 2014 10:47 AM To: Wei Liu Cc: user Subject: Re: Data loss - Spark streaming and network receiver Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.commailto:wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
RE: spark streaming - lamda architecture
Hi Ali, Maybe you can take a look at twitter's Summingbird project (https://github.com/twitter/summingbird), which is currently one of the few open source choices of lambda Architecture. There's a undergoing sub-project called summingbird-spark, that might be the one you wanted, might this can help you. Thanks Jerry -Original Message- From: salemi [mailto:alireza.sal...@udo.edu] Sent: Friday, August 15, 2014 11:25 AM To: u...@spark.incubator.apache.org Subject: Re: spark streaming - lamda architecture below is what is what I understand under lambda architecture. The batch layer provides the historical data and the speed layer provides the real-time view! All data entering the system is dispatched to both the batch layer and the speed layer for processing. The batch layer has two functions: (i) managing the master dataset (an immutable, append-only set of raw data), and (ii) to pre-compute the batch views. The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only. The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way. Any incoming query can be answered by merging results from batch views and real-time views. In my system I have events coming in from Kafka sources and currently we need to process 10,000 messages per second and write them out to hdfs and make them available to be queried by a serving layer. What would be your suggestion to architecturally solve this issue? How many solution with which would approx. be needed for the proposed architecture. Thanks, Ali Tathagata Das wrote Can you be a bit more specific about what you mean by lambda architecture? On Thu, Aug 14, 2014 at 2:27 PM, salemi lt; alireza.salemi@ gt; wrote: Hi, How would you implement the batch layer of lamda architecture with spark/spark streaming? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-l amda-architecture-tp12142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscribe@.apache For additional commands, e-mail: user-help@.apache Tathagata Das wrote Can you be a bit more specific about what you mean by lambda architecture? On Thu, Aug 14, 2014 at 2:27 PM, salemi lt; alireza.salemi@ gt; wrote: Hi, How would you implement the batch layer of lamda architecture with spark/spark streaming? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-l amda-architecture-tp12142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscribe@.apache For additional commands, e-mail: user-help@.apache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-lamda-architecture-tp12142p12163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark stream data from kafka topics and output as parquet file on HDFS
Hi Rafeeq, I think current Spark Streaming api can offer you the ability to fetch data from Kafka and store to another external store, if you do not care about management of consumer offset manually, there’s no need to use low level api as SimpleConsumer. For Kafka 0.8.1 compatibility, you can try to modify the pom file and rebuild Spark to try it, mostly I think it can work. For parquet file, I think if parquet offers its own OutputFormat that is extended from Hadoop’s OutputFormat, Spark can write data into parquet file, like sequence file or text file, you can do this as: DStream.foreach { rdd = rdd.saveAsHadoopFile(…) } to specify the OutputFormat you want. Thanks Jerry From: rafeeq s [mailto:rafeeq.ec...@gmail.com] Sent: Tuesday, August 05, 2014 5:37 PM To: Dibyendu Bhattacharya Cc: u...@spark.incubator.apache.org Subject: Re: Spark stream data from kafka topics and output as parquet file on HDFS Thanks Dibyendu. 1. Spark itself have api jar for kafka, still we require manual offset management (using simple consumer concept) and manual consumer ? 2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it for kafka 0.8.1 ? 3.How to use Kafka Spark Consumer to produce output as parquet file on HDFS ? Please give your suggestion. Regards, Rafeeq S (“What you do is what matters, not what you think or say or plan.” ) On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.commailto:dibyendu.bhattach...@gmail.com wrote: You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.commailto:rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to stream data from kafka topics and output as parquet file on HDFS. Please share the sample reference program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S (“What you do is what matters, not what you think or say or plan.” )
RE: spark.streaming.unpersist and spark.cleaner.ttl
Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: spark.streaming.unpersist and spark.cleaner.ttl
Yeah, the document may not be precisely aligned with latest code, so the best way is to check the code. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 5:56 PM To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses. I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: Executor metrics in spark application
Hi Denes, I think you can register your customized metrics source into metrics system through metrics.properties, you can take metrics.propertes.template as reference, Basically you can do as follow if you want to monitor on executor: executor.source.accumulator.class=xx.xx.xx.your-customized-metrics-source I think the below code can only register metrics source in client side. SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); BTW, it's not a good choice to register through MetricsSystem, it would be nice to register through configuration. Also you can enable console sink to verify whether the source is registered or not. Thanks Jerry -Original Message- From: Denes [mailto:te...@outlook.com] Sent: Tuesday, July 22, 2014 2:02 PM To: u...@spark.incubator.apache.org Subject: Re: Executor metrics in spark application I'm also pretty interested how to create custom Sinks in Spark. I'm using it with Ganglia and the normal metrics from JVM source do show up. I tried to create my own metric based on Issac's code, but does not show up in Ganglia. Does anyone know where is the problem? Here's the code snippet: class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends Source { val sourceName = accumulator.metrics val metricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name(accumulator, name), new Gauge[Long] { override def getValue: Long = { return accumulator.value; }}); } and then in the main: val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new AccumulatorSource(longAccumulator , counters.accumulator); SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: number of Cached Partitions v.s. Total Partitions
Yes, it's normal when memory is not enough to put the third partition, as you can see in your attached picture. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, July 22, 2014 3:09 PM To: user@spark.apache.org Subject: number of Cached Partitions v.s. Total Partitions Hi, I'm using local mode and read a text file as RDD using JavaSparkContext.textFile() API. And then call cache() method on the result RDD. I look at the Storage information and find the RDD has 3 partitions but 2 of them have been cached. Is this a normal behavior? I assume all of partitions should be cached or none of them. If I'm wrong, what are the cases when number of cached partitions is less than the total number of partitions? [cid:image001.jpg@01CFA5C3.0AE4B440]
RE: Executor metrics in spark application
Yeah, I start to know your purpose. Original design purpose of customized metrics source is focused on self-contained source, seems you need to rely on outer variable, so the way you mentioned may be is the only way to register. Besides, as you cannot see the source in Ganglia, I think you can enable console sink to verify the outputs, also seems you want to register this source in driver, so you need to enable Ganglia sink on driver side and make sure Ganglia client can connect your driver. Thanks Jerry -Original Message- From: Denes [mailto:te...@outlook.com] Sent: Tuesday, July 22, 2014 6:38 PM To: u...@spark.incubator.apache.org Subject: RE: Executor metrics in spark application Hi Jerry, I know that way of registering a metrics, but it seems defeat the whole purpose. I'd like to define a source that is set within the application, for example number of parsed messages. If I register it in the metrics.properties, how can I obtain the instance? (or instances?) How can I set the property? Is there a way to read an accumulator values from a Source? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Some question about SQL and streaming
Actually we have a POC project which shows the power of combining Spark Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get SchemaDStream. You can take a look at it: https://github.com/thunderain-project/StreamSQL Thanks Jerry From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, July 11, 2014 10:17 AM To: user@spark.apache.org Subject: Re: Some question about SQL and streaming Yeah, the right solution is to have something like SchemaDStream, where the schema of all the schemaRDD generated by it can be stored. Something I really would like to see happen in the future :) TD On Thu, Jul 10, 2014 at 6:37 PM, Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp wrote: Hi, I think it would be great if we could do the string parsing only once and then just apply the transformation for each interval (reducing the processing overhead for short intervals). Also, one issue with the approach above is that transform() has the following signature: def transform(transformFunc: RDD[T] = RDD[U]): DStream[U] and therefore, in my example val result = lines.transform((rdd, time) = { // execute statement rdd.registerAsTable(data) sqlc.sql(query) }) the variable `result ` is of type DStream[Row]. That is, the meta-information from the SchemaRDD is lost and, from what I understand, there is then no way to learn about the column names of the returned data, as this information is only encoded in the SchemaRDD. I would love to see a fix for this. Thanks Tobias
RE: Some question about SQL and streaming
No specific plans to do so, since there has some functional loss like time based windowing function which is important for streaming sql. Also keep compatible with fast growing SparkSQL is quite hard. So no clear plans to submit to upstream. -Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, July 11, 2014 10:47 AM To: user@spark.apache.org Subject: Re: Some question about SQL and streaming Hi, On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Actually we have a POC project which shows the power of combining Spark Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get SchemaDStream. You can take a look at it: https://github.com/thunderain-project/StreamSQL Wow, that looks great! Any plans to get this code (or functionality) merged into Spark? Tobias