Spark Streaming from Kafka
Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
How to retrive spark context when hiveContext is used in sparkstreaming
Hi, I'm trying to get hold of use spark context from hive context or streamingcontext. I have 2 pieces of codes one in core spark one in spark streaming. plain spark with hive which gives me context. Spark streaming code with hive which prints null. plz help me figure out how to make this code work. thanks in advance /core spark which gives context import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Trail extends App { val conf = new SparkConf(false).setMaster(local[*]).setAppName(Spark Streamer).set(spark.logConf, true).set(spark.cassandra.connection.host, 127.0.0.1).set(spark.cleaner.ttl, 300) val context = new SparkContext(conf) val hiveContext = new HiveContext(context) import com.dgm.Trail.hiveContext._ context textFile logs/log1.txt flatMap { data = val Array(id, signals) = data split '|' signals split '' map { signal = val Array(key, value) = signal split '=' Signal(id, key, value) } } registerTempTable signals hiveContext cacheTable signals val signalRows = hiveContext sql select id from signals where key='id' value='123' map rts cache() signalRows.foreach { x = println(signalRows.context) } } / spark streaming code which prints null import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Seconds, StreamingContext } object Trail extends App { val conf = new SparkConf(false).setMaster(local[*]).setAppName(Spark Streamer).set(spark.logConf, true).set(spark.cassandra.connection.host, 127.0.0.1).set(spark.cleaner.ttl, 300) val streamingContext = new StreamingContext(conf, Seconds(10)) val context = streamingContext.sparkContext val kafkaParams = Map( zookeeper.connect - localhost, group.id - spark_stream, zookeeper.connection.timeout.ms - 1, auto.offset.reset - smallest ) val stream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](streamingContext, kafkaParams, Map(tracker - 2), StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) val signalsDStream = stream flatMap { data = val Array(id, signals) = data split '|' signals split '' map { signal = val Array(key, value) = signal split '=' Signal(id, key, value) } } signalsDStream foreachRDD { rdds = val hiveContext = new HiveContext(streamingContext.sparkContext) import hiveContext._ rdds registerTempTable signals hiveContext cacheTable signals val signalRows = hiveContext sql select id from signals where key='id' and value='123' map rts cache () signalRows.foreach { x = //println(streamingContext.sparkContext) causes serialization error println(hiveContext.sparkContext) } } streamingContext.start() } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-retrive-spark-context-when-hiveContext-is-used-in-sparkstreaming-tp17609.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FileNotFoundException in appcache shuffle files
Hi, Ryan We have met similar errors and increasing executor memory solved it. Though I am not sure about the detailed reason, it might be worth a try. On Wed, Oct 29, 2014 at 1:34 PM, Ryan Williams [via Apache Spark User List] ml-node+s1001560n17605...@n3.nabble.com wrote: My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dump https://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: - val A = [read a file in 1419 shards] - the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). - some relevant Spark options: - spark.default.parallelism=2000 - --master yarn-client - --executor-memory 50g - --driver-memory 10g - --num-executors 100 - --executor-cores 4 - A.repartition(3000) - 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in - A.persist() - A.count() // succeeds - screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v - I don't know why each task reports 8 TB of Input; that metric seems like it is always ludicrously high and I don't pay attention to it typically. - Each task shuffle-writes 3.5GB, for a total of 4.9TB - Does that mean that 4.9TB is the uncompressed size of the file that A was read from? - 4.9TB is pretty close to the total amount of memory I've configured the job to use: (50GB/executor) * (100 executors) ~= 5TB. - Is that a coincidence, or are my executors shuffle-writing an amount equal to all of their memory for some reason? - val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x = x).persist() - my expectation is that ~all elements pass the filter step, so B should ~equal to A, just to give a sense of the expected memory blowup. - B.count() - this *fails* while executing .groupBy(...) above I've found a few discussions of issues whose manifestations look *like* this, but nothing that is obviously the same issue. The closest hit I've seen is Stage failure in BlockManager... http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3ccangvg8qtk57frws+kaqtiuz9jsls5qjkxxjxttq9eh2-gsr...@mail.gmail.com%3E[2] on this list on 8/20; some key excerpts: - likely due to a bug in shuffle file consolidation
Fwd: sampling in spark
-- Forwarded message -- From: Chengi Liu chengi.liu...@gmail.com Date: Tue, Oct 28, 2014 at 11:23 PM Subject: Re: sampling in spark To: Davies Liu dav...@databricks.com Any suggestions.. Thanks On Tue, Oct 28, 2014 at 12:53 AM, Chengi Liu chengi.liu...@gmail.com wrote: Is there an equivalent way of doing the following: a = [1,2,3,4] reduce(lambda x, y: x+[x[-1]+y], a, [0])[1:] ?? The issue with above suggestion is that population is a hefty data structure :-/ On Tue, Oct 28, 2014 at 12:42 AM, Davies Liu dav...@databricks.com wrote: _cumm = [p[0]] for i in range(1, len(p)): _cumm.append(_cumm[-1] + p[i]) index = set([bisect(_cumm, random.random()) for i in range(k)]) chosed_x = X.zipWithIndex().filter(lambda (v, i): i in index).map(lambda (v, i): v) chosed_y = [v for i, v in enumerate(y) if i in index] On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com wrote: Oops, the reference for the above code: http://stackoverflow.com/questions/26583462/selecting-corresponding-k-rows-from-matrix-and-vector/26583945#26583945 On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I have three rdds.. X,y and p X is matrix rdd (mXn), y is (mX1) dimension vector and p is (mX1) dimension probability vector. Now, I am trying to sample k rows from X and corresponding entries in y based on probability vector p. Here is the python implementation import random from bisect import bisect from operator import itemgetter def sample(population, k, prob): def cdf(population, k, prob): population = map(itemgetter(1), sorted(zip(prob, population))) cumm = [prob[0]] for i in range(1, len(prob)): cumm.append(_cumm[-1] + prob[i]) return [population[bisect(cumm, random.random())] for i in range(k)] return cdf(population, k, prob)
Re: Submiting Spark application through code
And the scala way of doing it would be: val sc = new SparkContext(conf) sc.addJar(/full/path/to/my/application/jar/myapp.jar) On Wed, Oct 29, 2014 at 1:44 AM, Shailesh Birari sbir...@wynyardgroup.com wrote: Yes, this is doable. I am submitting the Spark job using JavaSparkContext spark = new JavaSparkContext(sparkMaster, app name, System.getenv(SPARK_HOME), new String[] {application JAR}); To run this first you have to create the application jar and in above API specify its absolute path. That's all. Run your java application like any other. Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-Spark-application-through-code-tp17452p17553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Yes we shade akka to change its protobuf version (If I am not wrong.). Yes, binary compatibility with other akka modules is compromised. One thing you can try is use akka from org.spark-project.akka, I have not tried this and not sure if its going to help you but may be you could exclude the akka spray depends on and use the akka spark depends on. Prashant Sharma On Wed, Oct 29, 2014 at 9:27 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 3:03 AM *To:* user *Subject:* Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: install sbt
1. Download https://dl.bintray.com/sbt/native-packages/sbt/0.13.6/sbt-0.13.6.zip 2. Extract 3. export PATH=$PATH:/path/to/sbt/bin/sbt Now you can do all the sbt commands (sbt package etc.) Thanks Best Regards On Tue, Oct 28, 2014 at 9:49 PM, Soumya Simanta soumya.sima...@gmail.com wrote: sbt is just a jar file. So you really don't need to install anything. Once you run the jar file (sbt-launch.jar) it can download the required dependencies. I use an executable script called sbt that has the following contents. SBT_OPTS=-Xms1024M -Xmx2048M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=1024M echo $SBT_OPTS java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar $@ On Tue, Oct 28, 2014 at 12:13 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: If you're just calling sbt from within the spark/sbt folder, it should download and install automatically. Nick 2014년 10월 28일 화요일, Ted Yuyuzhih...@gmail.com님이 작성한 메시지: Have you read this ? http://lancegatlin.org/tech/centos-6-install-sbt On Tue, Oct 28, 2014 at 7:54 AM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Is there a repo or some kind of instruction about how to install sbt for centos? Thanks,
Re: Saving to Cassandra from Spark Streaming
You need to set the following jar (cassandra-connector http://central.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha3/spark-cassandra-connector_2.10-1.1.0-alpha3.jar) in the classpath like: ssc.sparkContext.addJar(/path/to/spark-cassandra-connector_2.10-1.1.0-alpha3.jar) Thanks Best Regards On Tue, Oct 28, 2014 at 10:09 PM, Gerard Maas gerard.m...@gmail.com wrote: Looks like you're having some classpath issues. Are you providing your spark-cassandra-driver classes to your job? sparkConf.setJars(Seq(jars...)) ? On Tue, Oct 28, 2014 at 5:34 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, I'm having trouble troubleshooting this particular block of code for Spark Streaming and saving to Cassandra: val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) //-- Writing it to Cassandra wordCounts.saveToCassandra(test, kv, SomeColumns(key, value), 1) Could you tell me where I'm going wrong ? Can I not call wordCounts.saveToCassandra ? Here's the error: Exception in thread main java.lang.NoClassDefFoundError: com/datastax/spark/connector/mapper/ColumnMapper Thanks, Harold
Re: Submitting Spark job on Unix cluster from dev environment (Windows)
What are you trying to do? Connecting to a remote cluster from your local windows eclipse environment? Just make sure you meet the following: 1. Set spark.driver.host to your local ip (Where you runs your code, and it should be accessible from the cluster) 2. Make sure no firewall/router configurations are blocking/filtering the connection between your windows machine and the cluster. Best way to test would be to ping the windows machine's public ip from your cluster. (And if the pinging is working, then make sure you are portforwaring the required ports) 3. Also set spark.driver.port if you don't want to open up all the ports on your windows machine (default is random, so stick to one port) Thanks Best Regards On Wed, Oct 29, 2014 at 1:39 AM, Shailesh Birari sbir...@wynyardgroup.com wrote: Can anyone please help me here ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-tp16989p17552.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unsubscribe
Send it to user-unsubscr...@spark.apache.org. Read the community page https://spark.apache.org/community.html for more info Thanks Best Regards On Wed, Oct 29, 2014 at 3:32 AM, Ricky Thomas ri...@truedash.io wrote:
Re: Batch of updates
I don't think accumulators come into play here. Use foreachPartition, not mapPartitions. On Wed, Oct 29, 2014 at 12:43 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Sorry but I wasn't able to code my stuff using accumulators as you suggested :( In my use case I have to to add elements to an array/list and then, every 100 element commit the batch to a solr index and then clear it. In the cleanup code I have to commit the uncommited (remainder) elements. In the example you showed me I can't see how to append element to a list and commit the remainder elements of my RDD. Following the advice of Sean this is more o less what I have now, but still I don't know how to return empty iterators (and if I really have to) and if I still need to use accumulators to add element to the list (in the code below I haven't specified what to use..): .mapPartitions { partition = if (!partition.isEmpty) { // Some setup code here println(Initialize batch); partition.map(x = { var batch = ??? batch.add(someClassFactory.fromString(x._2, x._3)) if (batch.size % 100 == 0) { println(Flush legacy entities); batch.clear } if (!partition.hasNext) { // Some cleanup code here println(Flush legacy entities); batch.clear } Iterator.empty }) } else { // return an empty Iterator of your return type Iterator.empty } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming from Kafka
Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Use RDD like a Iterator
Call RDD.toLocalIterator()? https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin yun...@ebay.com wrote: Hi, ALL I have a RDD[T], can I use it like a iterator. That means I can compute every element of this RDD lazily. Best Regards, Kevin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming from Kafka
Thanks! How do I find out which Kafka jar to use for scala 2.10.4? — Sent from Mailbox On Wed, Oct 29, 2014 at 12:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Spark Streaming from Kafka
I using kafka_2.10-1.1.0.jar on spark 1.1.0 — Sent from Mailbox On Wed, Oct 29, 2014 at 12:31 AM, null har...@nexgate.com wrote: Thanks! How do I find out which Kafka jar to use for scala 2.10.4? — Sent from Mailbox On Wed, Oct 29, 2014 at 12:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Spark streaming and save to cassandra and elastic search
Hi I ve written a spark streaming code which streams data from flume to kafka which is received by spark. I ve used update state by key and then for each rdd im saving them into cassandra and elsatic search(by calling 2 different methods). The above parts are working fine when streaming job is submitted by using spark-submit directly on the server (via putty). But if i open ssh session by java using Jsch and try to run spark-submit command, only update state by key is working. The data is not getting saved to elastic search and cassandra. I ve checked spark-logs and there are no exception. Any pointers to where I am going wrong. Thanks in Advance. Aarthi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-and-save-to-cassandra-and-elastic-search-tp17623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pySpark - convert log/txt files into sequenceFile
Thank you Holden, it works! infile = sc.wholeTextFiles(sys.argv[1]) rdd = sc.parallelize(infile.collect()) rdd.saveAsSequenceFile(sys.argv[2]) Csaba 2014-10-28 17:56 GMT+01:00 Holden Karau hol...@pigscanfly.ca: Hi Csaba, It sounds like the API you are looking for is sc.wholeTextFiles :) Cheers, Holden :) On Tuesday, October 28, 2014, Csaba Ragany rag...@gmail.com wrote: Dear Spark Community, Is it possible to convert text files (.log or .txt files) into sequencefiles in Python? Using PySpark I can create a parallelized file with rdd=sc.parallelize([('key1', 1.0)]) and I can save it as a sequencefile with rdd.saveAsSequenceFile(). But how can I put the whole content of my text files into the 'value' of 'key1' ? I want a sequencefile where the keys are the filenames of the text files and the values are their content. Thank you for any help! Csaba -- Cell : 425-233-8271
Re: Use RDD like a Iterator
RDD.toLocalIterator() is the suitable solution. But I doubt whether it conform with the design principle of spark and RDD. All RDD transform is lazily computed until it end with some actions. 2014-10-29 15:28 GMT+08:00 Sean Owen so...@cloudera.com: Call RDD.toLocalIterator()? https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin yun...@ebay.com wrote: Hi, ALL I have a RDD[T], can I use it like a iterator. That means I can compute every element of this RDD lazily. Best Regards, Kevin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sbt/sbt compile error [FATAL]
Hi, I have cloned sparked as: git clone g...@github.com:apache/spark.git cd spark sbt/sbt compile Apparently http://repo.maven.apache.org/maven2 is no longer valid. See the error further below. Is this correct? And what should it be changed to? Everything seems to go smooth until : [info] downloading https://repo1.maven.org/maven2/org/ow2/asm/asm-tree/5.0.3/asm-tree-5.0.3.jar ... [info] [SUCCESSFUL ] org.ow2.asm#asm-tree;5.0.3!asm-tree.jar (709ms) [info] Done updating. [info] Compiling 1 Scala source to /root/spark/project/spark-style/target/scala-2.10/classes... [info] Compiling 9 Scala sources to /root/.sbt/0.13/staging/ec3aa8f39111944cc5f2/sbt-pom-reader/target/scala-2.10/sbt-0.13/classes... [warn] there were 1 deprecation warning(s); re-run with -deprecation for details [warn] one warning found [info] Compiling 3 Scala sources to /root/spark/project/target/scala-2.10/sbt-0.13/classes... org.apache.maven.model.building.ModelBuildingException: 1 problem was encountered while building the effective model for org.apache.spark:spark-parent:1.2.0-SNAPSHOT [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 22, column 11 Regards Hans -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-compile-error-FATAL-tp17629.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sbt/sbt compile error [FATAL]
Are you trying to compile the master branch ? Can you try branch-1.1 ? On Wed, Oct 29, 2014 at 6:55 AM, HansPeterS hanspeter.sl...@gmail.com wrote: Hi, I have cloned sparked as: git clone g...@github.com:apache/spark.git cd spark sbt/sbt compile Apparently http://repo.maven.apache.org/maven2 is no longer valid. See the error further below. Is this correct? And what should it be changed to? Everything seems to go smooth until : [info] downloading https://repo1.maven.org/maven2/org/ow2/asm/asm-tree/5.0.3/asm-tree-5.0.3.jar ... [info] [SUCCESSFUL ] org.ow2.asm#asm-tree;5.0.3!asm-tree.jar (709ms) [info] Done updating. [info] Compiling 1 Scala source to /root/spark/project/spark-style/target/scala-2.10/classes... [info] Compiling 9 Scala sources to /root/.sbt/0.13/staging/ec3aa8f39111944cc5f2/sbt-pom-reader/target/scala-2.10/sbt-0.13/classes... [warn] there were 1 deprecation warning(s); re-run with -deprecation for details [warn] one warning found [info] Compiling 3 Scala sources to /root/spark/project/target/scala-2.10/sbt-0.13/classes... org.apache.maven.model.building.ModelBuildingException: 1 problem was encountered while building the effective model for org.apache.spark:spark-parent:1.2.0-SNAPSHOT [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 22, column 11 Regards Hans -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-compile-error-FATAL-tp17629.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1.0 on Hive 0.13.1
Hi, My Hive is 0.13.1, how to make Spark 1.1.0 run on Hive 0.13? Please advise. Or, any news about when will Spark 1.1.0 on Hive 0.1.3.1 be available? Regards Arthur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FileNotFoundException in appcache shuffle files
Hi Ryan - I've been fighting the exact same issue for well over a month now. I initially saw the issue in 1.02 but it persists in 1.1. Jerry - I believe you are correct that this happens during a pause on long-running jobs on a large data set. Are there any parameters that you suggest tuning to mitigate these situations? Also, you ask if there are any other exceptions - for me this error has tended to follow an earlier exception, which supports the theory that it is a symptom of an earlier problem. My understanding is as follows - during a shuffle step an executor fails and doesn't report its output - next, during the reduce step, that output can't be found where expected and rather than rerunning the failed execution, Spark goes down. We can add my email thread to your reference list : https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201410.mbox/CAM-S9zS-+-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd=h...@mail.gmail.com -Original Message- From: Shao, Saisai [saisai.s...@intel.commailto:saisai.s...@intel.com] Sent: Wednesday, October 29, 2014 01:46 AM Eastern Standard Time To: Ryan Williams Cc: user Subject: RE: FileNotFoundException in appcache shuffle files Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one. Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in
Re: Spark 1.1.0 on Hive 0.13.1
Spark 1.1.0 doesn't support Hive 0.13.1. We plan to support it in 1.2.0, and related PRs are already merged or being merged to the master branch. On 10/29/14 7:43 PM, arthur.hk.c...@gmail.com wrote: Hi, My Hive is 0.13.1, how to make Spark 1.1.0 run on Hive 0.13? Please advise. Or, any news about when will Spark 1.1.0 on Hive 0.1.3.1 be available? Regards Arthur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0 on Hive 0.13.1
Hi, Thanks for your update. Any idea when will Spark 1.2 be GA? Regards Arthur On 29 Oct, 2014, at 8:22 pm, Cheng Lian lian.cs@gmail.com wrote: Spark 1.1.0 doesn't support Hive 0.13.1. We plan to support it in 1.2.0, and related PRs are already merged or being merged to the master branch. On 10/29/14 7:43 PM, arthur.hk.c...@gmail.com wrote: Hi, My Hive is 0.13.1, how to make Spark 1.1.0 run on Hive 0.13? Please advise. Or, any news about when will Spark 1.1.0 on Hive 0.1.3.1 be available? Regards Arthur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming and save to cassandra and elastic search
Yes the data is getting processed. I printed the data and the rdd count. The point where data is getting saved is not invoked. I am using the same class and jar for submitting by both methods. Only difference is I am launching by tomcat and there directly by putty. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-and-save-to-cassandra-and-elastic-search-tp17623p17635.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Streaming Question regarding lazy calculations
Hi All I am using spark streaming with kafka streaming for 24/7 My Code is something like JavaDStreamString data = messages.map(new MapData()); JavaPairDStreamString, Iterablelt;String records = data.mapToPair(new dataPair()).groupByKey(100); records.print(); JavaPairDStreamString, Double result = records.mapValues(new Sum()).updateStateByKey(updateFunction).cache(); result.foreach{ write(result,path); //writing result to the path } Since result holds historcal value , even when there is no input record for 10 min , no change in result i tend to write it again and again for every 3 secs i tried checking if(record.count() 0 ) { result.foreach(write file) } But spark is not considering my check.. Any insight on how to achieve it -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Question-regarding-lazy-calculations-tp17636.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
CANNOT FIND ADDRESS
SparkApplication UI shows that one of the executor Cannot find Addresss Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 mddworker1.c.fi-mdd-poc.internal:42197 0 ms0 0 0 0.0 B 136.1 MB184.9 MB 146.8 GB135.4 MB 1 CANNOT FIND ADDRESS 0 ms0 0 0 0.0 B 87.4 MB 142.0 MB61.4 GB 81.4 MB I also see following in one of the executor logs for which the driver may have lost communication. 14/10/29 13:18:33 WARN : Master_Client Heartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 5000 14/10/29 13:18:33 WARN : WorkerClientToWorkerHeartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 1000 14/10/29 13:18:33 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362) I have also seen other variation of timeouts 14/10/29 06:21:05 WARN SendingConnection: Error finishing connection to mddworker1.c.fi-mdd-poc.internal/10.240.179.241:40442 java.net.ConnectException: Connection refused 14/10/29 06:21:05 ERROR BlockManager: Failed to report broadcast_6_piece0 to master; giving up. or 14/10/29 07:23:40 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:218) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:58) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:310) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:190) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:188) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at org.apache.spark.util.TimeStampedHashMap.foreach(TimeStampedHashMap.scala:107) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.storage.BlockManager.reportAllBlocks(BlockManager.scala:188) at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:207) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:366) How do I track down what is causing this problem? Any suggestion on solution, debugging or workaround will be helpful! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
We used both spray and Akka. To avoid comparability issue, we used spark shaded akka version. It works for us. This is 1.1.0 branch, I have not tried with master branch Chester Sent from my iPad On Oct 28, 2014, at 11:48 PM, Prashant Sharma scrapco...@gmail.com wrote: Yes we shade akka to change its protobuf version (If I am not wrong.). Yes, binary compatibility with other akka modules is compromised. One thing you can try is use akka from org.spark-project.akka, I have not tried this and not sure if its going to help you but may be you could exclude the akka spray depends on and use the akka spark depends on. Prashant Sharma On Wed, Oct 29, 2014 at 9:27 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Tuesday, October 28, 2014 3:03 AM To: user Subject: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: problem with start-slaves.sh
I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
Spark Performance
I am relatively new to spark processing. I am using Spark Java API to process data. I am having trouble processing a data set that I don't think is significantly large. It is joining a dataset that is around 3-4gb each (around 12 gb data). The workflow is: x=RDD1.KeyBy(x).partitionBy(new HashPartitioner(10).cache() y=RDD2.KeyBy(x).partitionBy(new HashPartitioner(10).cache() z=RDD3.KeyBy(x).partitionBy(new HashPartitioner(10).cache() o=RDD4.KeyBy(y).partitionBy(new HashPartitioner(10).cache() out=x.join(y).join(z).keyBy(y).partitionBy(new HashPartitioner(10).cache().join(o) out.saveAsObject(Out); The spark processor seems to be hung at out= step indefinitely. I am using kyro for serialization. using local with SPARK_MEM=90g. I have 16cpu, 108g ram. I am saving output to hadoop. I have also tried on a standalone cluster with 2 workers 8 cpu and 52 gb ram. My VMs are on google cloud. Below is the table from the completed stages. Stage IdDescription Submitted DurationTasks: Succeeded/Total Input Shuffle ReadShuffle Write 8 keyBy at ProcessA.java:1094+details 10/27/2014 12:402.0 min 10-Oct 3 filter at ProcessA.java:1079+details10/27/2014 12:402.0 min 10-Oct 2 keyBy at ProcessA.java:1071+details 10/27/2014 12:3939 s 11-Nov 268.4 MB 25.7 MB 1 filter at ProcessA.java:1103+details10/27/2014 12:3916 s 9-Sep 58.8 MB 30.4 MB 7 keyBy at ProcessA.java:1045+details 10/27/2014 12:3932 s 24/24 2.8 GB 573.8 MB 6 keyBy at ProcessA.java:1045+details 10/27/2014 12:3940 s 11-Nov 268.4 MB 24.5 MB Somethings, I don't understand.. I see output in the logfiles where it is indicating it is spilling in-memory map to disk, and the spilling size is greater than the input. I am not sure how to avoid that or reduce that... I also tried the cluster mode where I observed the same behavior, so I questioned whether the processes are running in parallel or serial? 14/10/27 14:11:33 INFO collection.ExternalAppendOnlyMap: Thread 94 spilling in-memory map of 1000 MB to disk ( 15 times so far) 14/10/27 14:11:34 INFO collection.ExternalAppendOnlyMap: Thread 107 spilling in-memory map of 2351 MB to disk (2 times so far) 14/10/27 14:11:36 INFO collection.ExternalAppendOnlyMap: Thread 94 spilling in-memory map of 1000 MB to disk ( 16 times so far) 14/10/27 14:11:37 INFO collection.ExternalAppendOnlyMap: Thread 91 spilling in-memory map of 4781 MB to disk ( 10 times so far) 14/10/27 14:11:38 INFO collection.ExternalAppendOnlyMap: Thread 112 spilling in-memory map of 1243 MB to disk (10 times so far) 14/10/27 14:11:39 INFO collection.ExternalAppendOnlyMap: Thread 94 spilling in-memory map of 983 MB to disk (1 7 times so far) 14/10/27 14:11:39 INFO collection.ExternalAppendOnlyMap: Thread 96 spilling in-memory map of 75546 MB to disk (11 times so far) 14/10/27 14:11:56 INFO collection.ExternalAppendOnlyMap: Thread 106 spilling in-memory map of 2324 MB to disk (7 times so far) 14/10/27 14:11:56 INFO collection.ExternalAppendOnlyMap: Thread 112 spilling in-memory map of 1729 MB to disk (11 times so far) 14/10/27 14:11:58 INFO collection.ExternalAppendOnlyMap: Thread 96 spilling in-memory map of 2410 MB to disk ( 12 times so far) 14/10/27 14:11:58 INFO collection.ExternalAppendOnlyMap: Thread 91 spilling in-memory map of 1211 MB to disk I would appreciate any pointers in the right direction! ___ by the way, I also see behavior described error messages like Not enough space to cache partition rdd_21_4 -indicating perhaps nothing is getting cached. per - http://mail-archives.apache.org/mod_mbox/spark-issues/201409.mbox/%3cjira.12744773.141202099.148323.1412021014...@atlassian.jira%3E -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-tp17640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CANNOT FIND ADDRESS
CANNOT FIND ADDRESS occurs when your executor has crashed. I'll look further down where it shows each task and see if you see any tasks failed. Then you can examine the error log of that executor and see why it died. On Wed, Oct 29, 2014 at 9:35 AM, akhandeshi ami.khande...@gmail.com wrote: SparkApplication UI shows that one of the executor Cannot find Addresss Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 mddworker1.c.fi-mdd-poc.internal:42197 0 ms0 0 0 0.0 B 136.1 MB184.9 MB 146.8 GB135.4 MB 1 CANNOT FIND ADDRESS 0 ms0 0 0 0.0 B 87.4 MB 142.0 MB61.4 GB 81.4 MB I also see following in one of the executor logs for which the driver may have lost communication. 14/10/29 13:18:33 WARN : Master_Client Heartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 5000 14/10/29 13:18:33 WARN : WorkerClientToWorkerHeartbeat last execution took 90859 ms. Longer than the FIXED_EXECUTION_INTERVAL_MS 1000 14/10/29 13:18:33 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362) I have also seen other variation of timeouts 14/10/29 06:21:05 WARN SendingConnection: Error finishing connection to mddworker1.c.fi-mdd-poc.internal/10.240.179.241:40442 java.net.ConnectException: Connection refused 14/10/29 06:21:05 ERROR BlockManager: Failed to report broadcast_6_piece0 to master; giving up. or 14/10/29 07:23:40 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:218) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:58) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:310) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:190) at org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:188) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at org.apache.spark.util.TimeStampedHashMap.foreach(TimeStampedHashMap.scala:107) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.storage.BlockManager.reportAllBlocks(BlockManager.scala:188) at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:207) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:366) How do I track down what is causing this problem? Any suggestion on solution, debugging or workaround will be helpful! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java api overhead?
since spark holds data structures on heap (and by default tries to work with all data in memory) and its written in Scala seeing lots of scala Tuple2 is not unexpected. how do these numbers relate to your data size? On Oct 27, 2014 2:26 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi, I wanted to understand what kind of memory overheads are expected if at all while using the Java API. My application seems to have a lot of live Tuple2 instances and I am hitting a lot of gc so I am wondering if I am doing something fundamentally wrong. Here is what the top of my heap looks like. I actually create reifier.tuple.Tuple objects and pass them to map methods and mostly return Tuple2Tuple,Tuple. The heap seems to have far too many Tuple2 and $colon$colon. num #instances #bytes class name -- 1: 85414872 2049956928 scala.collection.immutable.$colon$colon 2: 85414852 2049956448 scala.Tuple2 3:304221 14765832 [C 4:3029237270152 java.lang.String 5: 441112624624 [Ljava.lang.Object; 6: 12101495256 [B 7: 39839 956136 java.util.ArrayList 8:29 950736 [Lscala.concurrent.forkjoin.ForkJoinTask; 9: 8129 827792 java.lang.Class 10: 33839 812136 java.lang.Long 11: 33400 801600 reifier.tuple.Tuple 12: 6116 538208 java.lang.reflect.Method 13: 12767 408544 java.util.concurrent.ConcurrentHashMap$Node 14: 5994 383616 org.apache.spark.scheduler.ResultTask 15: 10298 329536 java.util.HashMap$Node 16: 11988 287712 org.apache.spark.rdd.NarrowCoGroupSplitDep 17: 5708 228320 reifier.block.Canopy 18: 9 215784 [Lscala.collection.Seq; 19: 12078 193248 java.lang.Integer 20: 12019 192304 java.lang.Object 21: 5708 182656 reifier.block.Tree 22: 2776 173152 [I 23: 6013 144312 scala.collection.mutable.ArrayBuffer 24: 5994 143856 [Lorg.apache.spark.rdd.CoGroupSplitDep; 25: 5994 143856 org.apache.spark.rdd.CoGroupPartition 26: 5994 143856 org.apache.spark.rdd.ShuffledRDDPartition 27: 4486 143552 java.util.Hashtable$Entry 28: 6284 132800 [Ljava.lang.Class; 29: 1819 130968 java.lang.reflect.Field 30: 605 101208 [Ljava.util.HashMap$Node; Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal
Re: Spark 1.1.0 on Hive 0.13.1
Sometime after Nov. 15: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage On Wed, Oct 29, 2014 at 5:28 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Thanks for your update. Any idea when will Spark 1.2 be GA? Regards Arthur On 29 Oct, 2014, at 8:22 pm, Cheng Lian lian.cs@gmail.com wrote: Spark 1.1.0 doesn't support Hive 0.13.1. We plan to support it in 1.2.0, and related PRs are already merged or being merged to the master branch. On 10/29/14 7:43 PM, arthur.hk.c...@gmail.com wrote: Hi, My Hive is 0.13.1, how to make Spark 1.1.0 run on Hive 0.13? Please advise. Or, any news about when will Spark 1.1.0 on Hive 0.1.3.1 be available? Regards Arthur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CANNOT FIND ADDRESS
Thanks...hmm It is seems to be a timeout issue perhaps?? Not sure what is causing it? or how to debug? I see following error message... 4/10/29 13:26:04 ERROR ContextCleaner: Error cleaning broadcast 9 akka.pattern.AskTimeoutException: Timed out at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$ $unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) at java.lang.Thread.run(Thread.java:745) 14/10/29 13:26:04 WARN BlockManagerMaster: Failed to remove broadcast 9 with removeFromMaster = true - Timed o ut} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637p17646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
XML Utilities for Apache Spark
I developed the spark-xml-utils library because we have a large amount of XML in big datasets and I felt this data could be better served by providing some helpful xml utilities. This includes the ability to filter documents based on an xpath/xquery expression, return specific nodes for an xpath/xquery expression, or transform documents using an xquery or an xslt stylesheet. By providing some basic wrappers to Saxon-HE, the spark-xml-utils library exposes some basic xpath, xslt, and xquery functionality that can readily be leveraged by any Spark application (including the spark-shell). We want to share this library with the community and are making it available under the Apache 2.0 license. For point of reference, I was able to parse and apply a fairly complex xpath expression against 2 million documents (130GB total and 75KB/doc average) in less than 3 minute on an AWS cluster (at spot price) costing less than $1/hr. When I have a chance, I will blog/write about some of my other investigations when using spark-xml-utils. More about the project is available on github(https://github.com/elsevierlabs/spark-xml-utils). There are examples for usage from the spark-shell as well as from a Java application. Feel free to use, contribute, and/or let us know how this library can be improved. Let me know if you have any questions. Darin.
Re: pySpark - convert log/txt files into sequenceFile
Without the second line, it's will much faster: infile = sc.wholeTextFiles(sys.argv[1]) infile.saveAsSequenceFile(sys.argv[2]) On Wed, Oct 29, 2014 at 3:31 AM, Csaba Ragany rag...@gmail.com wrote: Thank you Holden, it works! infile = sc.wholeTextFiles(sys.argv[1]) rdd = sc.parallelize(infile.collect()) rdd.saveAsSequenceFile(sys.argv[2]) Csaba 2014-10-28 17:56 GMT+01:00 Holden Karau hol...@pigscanfly.ca: Hi Csaba, It sounds like the API you are looking for is sc.wholeTextFiles :) Cheers, Holden :) On Tuesday, October 28, 2014, Csaba Ragany rag...@gmail.com wrote: Dear Spark Community, Is it possible to convert text files (.log or .txt files) into sequencefiles in Python? Using PySpark I can create a parallelized file with rdd=sc.parallelize([('key1', 1.0)]) and I can save it as a sequencefile with rdd.saveAsSequenceFile(). But how can I put the whole content of my text files into the 'value' of 'key1' ? I want a sequencefile where the keys are the filenames of the text files and the values are their content. Thank you for any help! Csaba -- Cell : 425-233-8271 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark and Cassandra 2.1 Examples
Hey all, Just thought I'd share this with the list in case any one else would benefit. Currently working on a proper integration of PySpark and DataStax's new Cassandra-Spark connector, but that's on going. In the meanwhile, I've basically updated the cassandra_inputformat.py and cassandra_outputformat.py examples that come with Spark. https://github.com/Parsely/pyspark-cassandra. The new example shows reading and writing to Cassandra including proper handling of CQL 3.1 collections: lists, sets and maps. Think it also clarifies the format RDDs are required be in to write data to Cassandra https://github.com/Parsely/pyspark-cassandra/blob/master/src/main/python/pyspark_cassandra_hadoop_example.py#L83-L97 and provides a more general serializer https://github.com/Parsely/pyspark-cassandra/blob/master/src/main/scala/SparkConverters.scala#L34-L88 to write Python (serialized via Py4J) structs to Cassandra. Comments or questions are welcome. Will update the group again when we have support for the DataStax connector. -- Mike Sukmanowsky Aspiring Digital Carpenter *p*: +1 (416) 953-4248 *e*: mike.sukmanow...@gmail.com facebook http://facebook.com/mike.sukmanowsky | twitter http://twitter.com/msukmanowsky | LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github https://github.com/msukmanowsky
Re: PySpark and Cassandra 2.1 Examples
Nice! - Helena @helenaedelson On Oct 29, 2014, at 12:01 PM, Mike Sukmanowsky mike.sukmanow...@gmail.com wrote: Hey all, Just thought I'd share this with the list in case any one else would benefit. Currently working on a proper integration of PySpark and DataStax's new Cassandra-Spark connector, but that's on going. In the meanwhile, I've basically updated the cassandra_inputformat.py and cassandra_outputformat.py examples that come with Spark. https://github.com/Parsely/pyspark-cassandra. The new example shows reading and writing to Cassandra including proper handling of CQL 3.1 collections: lists, sets and maps. Think it also clarifies the format RDDs are required be in to write data to Cassandra and provides a more general serializer to write Python (serialized via Py4J) structs to Cassandra. Comments or questions are welcome. Will update the group again when we have support for the DataStax connector. -- Mike Sukmanowsky Aspiring Digital Carpenter p: +1 (416) 953-4248 e: mike.sukmanow...@gmail.com facebook | twitter | LinkedIn | github
Spark Streaming with Kinesis
Hi all, I followed the guide here: http://spark.apache.org/docs/latest/streaming-kinesis-integration.html But got this error: Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider Would you happen to know what dependency or jar is needed ? Harold
Re: Unit Testing (JUnit) with Spark
add these to your dependencies: io.netty % netty % 3.6.6.Final exclude(io.netty, netty-all) to the end of spark and hadoop dependencies reference: https://spark-project.atlassian.net/browse/SPARK-1138 I am using Spark 1.1 so the akka issue is already fixed -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861p17652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
Hi TD, Thanks a lot for the comprehensive answer. I think this explanation deserves some place in the Spark Streaming tuning guide. -kr, Gerard. On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Gerard, This is a very good question! *TL;DR: *The performance should be same, except in case of shuffle-based operations where the number of reducers is not explicitly specified. Let me answer in more detail by dividing the set of DStream operations into three categories. *1. Map-like operations (map, flatmap, filter, etc.) that does not involve any shuffling of data:* Performance should virtually be the same in both cases. Either ways, in each batch, the operations on the batch's RDD are first set on the driver, and then the actions like on the RDD are executed. There are very very minor differences in the two cases of early foreachRDD and late foreachRDD (e.x, cleaning up for function closures, etc.) but those should make almost not difference in the performance. *2. Operations involving shuffle: *Here is there is a subtle difference in both cases if the number of partitions is not specified. The default number of partitions used when using dstream.reduceByKey() and than when using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to play around with the number of reducers to see what performs better. But if the number of reducers is explicitly specified and is the same both cases, then the performance should be similar. Note that this difference in the default numbers are not guaranteed to be like this, it could change in future implementations. *3. Aggregation-like operations (count, reduce): *Here there is another subtle execution difference between - dstream.count() which produces a DStream of single-element RDDs, the element being the count, and - dstream.foreachRDD(_.count()) which returns the count directly. In the first case, some random worker node is chosen for the reduce, in another the driver is chosen for the reduce. There should not be a significant performance difference. *4. Other operations* including window ops and stateful ops (updateStateByKey), are obviously not part of the discussion as they cannot be (easily) done through early foreachRDD. Hope this helps! TD PS: Sorry for not noticing this question earlier. On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas gerard.m...@gmail.com wrote: PS: Just to clarify my statement: Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. With feared RDD operations on the driver I meant to contrast an rdd action like rdd.collect that would pull all rdd data to the driver, with dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on the driver' yet, all that it looks to be running on the driver is the scheduling of 'op' on that rdd, just like it happens for all rdd other operations (thanks to Sean for the clarification) So, not to move focus away from the original question: In Spark Streaming, would it be better to do foreachRDD early in a pipeline or instead do as much Dstream transformations before going into the foreachRDD call? Between these two pieces of code, from a performance perspective, what would be preferred and why: - Early foreachRDD: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) - As most dstream transformations as possible before foreachRDD: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} ? kr, Gerard. On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern:
Re: Spark Streaming with Kinesis
Hi again, After getting through several dependencies, I finally got to this non-dependency type error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V It look every similar to this post: http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi Since I'm a little new to everything, would someone be able to provide a step-by-step guidance for that ? Harold On Wed, Oct 29, 2014 at 9:22 AM, Harold Nguyen har...@nexgate.com wrote: Hi all, I followed the guide here: http://spark.apache.org/docs/latest/streaming-kinesis-integration.html But got this error: Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider Would you happen to know what dependency or jar is needed ? Harold
Spark SQL and confused about number of partitions/tasks to do a simple join.
I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
RE: how to retrieve the value of a column of type date/timestamp from a Spark SQL Row
Thanks, guys. Michael Armbrust also suggested the same two approaches. I believe “getAs[Date]” is available only in 1.2 branch and I have Spark 1.1, so I am using row(i).asInstanceOf[Date], which works. Mohammed From: Shixiong Zhu [mailto:zsxw...@gmail.com] Sent: Tuesday, October 28, 2014 10:23 PM To: Zhan Zhang Cc: Mohammed Guller; user@spark.apache.org Subject: Re: how to retrieve the value of a column of type date/timestamp from a Spark SQL Row Or def getAs[T](i: Int): T Best Regards, Shixiong Zhu 2014-10-29 13:16 GMT+08:00 Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com: Can you use row(i).asInstanceOf[] Thanks. Zhan Zhang On Oct 28, 2014, at 5:03 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi – The Spark SQL Row class has methods such as getInt, getLong, getBoolean, getFloat, getDouble, etc. However, I don’t see a getDate method. So how can one retrieve a date/timestamp type column from a result set? Thanks, Mohammed CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
Good idea, will do for 1.2 release. On Oct 29, 2014 9:50 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi TD, Thanks a lot for the comprehensive answer. I think this explanation deserves some place in the Spark Streaming tuning guide. -kr, Gerard. On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Gerard, This is a very good question! *TL;DR: *The performance should be same, except in case of shuffle-based operations where the number of reducers is not explicitly specified. Let me answer in more detail by dividing the set of DStream operations into three categories. *1. Map-like operations (map, flatmap, filter, etc.) that does not involve any shuffling of data:* Performance should virtually be the same in both cases. Either ways, in each batch, the operations on the batch's RDD are first set on the driver, and then the actions like on the RDD are executed. There are very very minor differences in the two cases of early foreachRDD and late foreachRDD (e.x, cleaning up for function closures, etc.) but those should make almost not difference in the performance. *2. Operations involving shuffle: *Here is there is a subtle difference in both cases if the number of partitions is not specified. The default number of partitions used when using dstream.reduceByKey() and than when using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to play around with the number of reducers to see what performs better. But if the number of reducers is explicitly specified and is the same both cases, then the performance should be similar. Note that this difference in the default numbers are not guaranteed to be like this, it could change in future implementations. *3. Aggregation-like operations (count, reduce): *Here there is another subtle execution difference between - dstream.count() which produces a DStream of single-element RDDs, the element being the count, and - dstream.foreachRDD(_.count()) which returns the count directly. In the first case, some random worker node is chosen for the reduce, in another the driver is chosen for the reduce. There should not be a significant performance difference. *4. Other operations* including window ops and stateful ops (updateStateByKey), are obviously not part of the discussion as they cannot be (easily) done through early foreachRDD. Hope this helps! TD PS: Sorry for not noticing this question earlier. On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas gerard.m...@gmail.com wrote: PS: Just to clarify my statement: Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. With feared RDD operations on the driver I meant to contrast an rdd action like rdd.collect that would pull all rdd data to the driver, with dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on the driver' yet, all that it looks to be running on the driver is the scheduling of 'op' on that rdd, just like it happens for all rdd other operations (thanks to Sean for the clarification) So, not to move focus away from the original question: In Spark Streaming, would it be better to do foreachRDD early in a pipeline or instead do as much Dstream transformations before going into the foreachRDD call? Between these two pieces of code, from a performance perspective, what would be preferred and why: - Early foreachRDD: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) - As most dstream transformations as possible before foreachRDD: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} ? kr, Gerard. On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
Hi tathagata. I actually had a few minor improvements to spark streaming in SPARK-4040. possibly i could weave this in w/ my pr ? On Wed, Oct 29, 2014 at 1:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Good idea, will do for 1.2 release. On Oct 29, 2014 9:50 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi TD, Thanks a lot for the comprehensive answer. I think this explanation deserves some place in the Spark Streaming tuning guide. -kr, Gerard. On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Gerard, This is a very good question! *TL;DR: *The performance should be same, except in case of shuffle-based operations where the number of reducers is not explicitly specified. Let me answer in more detail by dividing the set of DStream operations into three categories. *1. Map-like operations (map, flatmap, filter, etc.) that does not involve any shuffling of data:* Performance should virtually be the same in both cases. Either ways, in each batch, the operations on the batch's RDD are first set on the driver, and then the actions like on the RDD are executed. There are very very minor differences in the two cases of early foreachRDD and late foreachRDD (e.x, cleaning up for function closures, etc.) but those should make almost not difference in the performance. *2. Operations involving shuffle: *Here is there is a subtle difference in both cases if the number of partitions is not specified. The default number of partitions used when using dstream.reduceByKey() and than when using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to play around with the number of reducers to see what performs better. But if the number of reducers is explicitly specified and is the same both cases, then the performance should be similar. Note that this difference in the default numbers are not guaranteed to be like this, it could change in future implementations. *3. Aggregation-like operations (count, reduce): *Here there is another subtle execution difference between - dstream.count() which produces a DStream of single-element RDDs, the element being the count, and - dstream.foreachRDD(_.count()) which returns the count directly. In the first case, some random worker node is chosen for the reduce, in another the driver is chosen for the reduce. There should not be a significant performance difference. *4. Other operations* including window ops and stateful ops (updateStateByKey), are obviously not part of the discussion as they cannot be (easily) done through early foreachRDD. Hope this helps! TD PS: Sorry for not noticing this question earlier. On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas gerard.m...@gmail.com wrote: PS: Just to clarify my statement: Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. With feared RDD operations on the driver I meant to contrast an rdd action like rdd.collect that would pull all rdd data to the driver, with dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on the driver' yet, all that it looks to be running on the driver is the scheduling of 'op' on that rdd, just like it happens for all rdd other operations (thanks to Sean for the clarification) So, not to move focus away from the original question: In Spark Streaming, would it be better to do foreachRDD early in a pipeline or instead do as much Dstream transformations before going into the foreachRDD call? Between these two pieces of code, from a performance perspective, what would be preferred and why: - Early foreachRDD: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) - As most dstream transformations as possible before foreachRDD: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} ? kr, Gerard. On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure
winutils
Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
Re: winutils
QQ - did you download the Spark 1.1 binaries that included the Hadoop one? Does this happen if you're using the Spark 1.1 binaries that do not include the Hadoop jars? On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
Re: CANNOT FIND ADDRESS
Can you try setting the following while creating the sparkContext and see if the issue still exists? .set(spark.core.connection.ack.wait.timeout,900) .set(spark.akka.frameSize,50) .set(spark.akka.timeout,900) Looks like your executor is stuck on GC Pause. Thanks Best Regards On Wed, Oct 29, 2014 at 9:20 PM, akhandeshi ami.khande...@gmail.com wrote: Thanks...hmm It is seems to be a timeout issue perhaps?? Not sure what is causing it? or how to debug? I see following error message... 4/10/29 13:26:04 ERROR ContextCleaner: Error cleaning broadcast 9 akka.pattern.AskTimeoutException: Timed out at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$ $unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) at java.lang.Thread.run(Thread.java:745) 14/10/29 13:26:04 WARN BlockManagerMaster: Failed to remove broadcast 9 with removeFromMaster = true - Timed o ut} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637p17646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL and confused about number of partitions/tasks to do a simple join.
ok. after reading some documentation, it would appear the issue is the default number of partitions for a join (200). After doing something like the following, I was able to change the value. From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: User user@spark.apache.org Sent: Wednesday, October 29, 2014 1:55 PM Subject: Spark SQL and confused about number of partitions/tasks to do a simple join. I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
Re: Spark SQL and confused about number of partitions/tasks to do a simple join.
Sorry, hit the send key a bitt too early. Anyway, this is the code I set. sqlContext.sql(set spark.sql.shuffle.partitions=10); From: Darin McBeath ddmcbe...@yahoo.com To: Darin McBeath ddmcbe...@yahoo.com; User user@spark.apache.org Sent: Wednesday, October 29, 2014 2:47 PM Subject: Re: Spark SQL and confused about number of partitions/tasks to do a simple join. ok. after reading some documentation, it would appear the issue is the default number of partitions for a join (200). After doing something like the following, I was able to change the value. From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: User user@spark.apache.org Sent: Wednesday, October 29, 2014 1:55 PM Subject: Spark SQL and confused about number of partitions/tasks to do a simple join. I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
RE: winutils
Well. I got past this problem and the manner was in my own email. I did download the one with Hadoop since it was among the only ones you don't have to compile from source along with CDH and Map. It worked yesterday because I added 1.1.0 as a maven dependency from the repository. I just did the same thing again and it worked perfect. One peculiarity I will mention is that even with Scala IDE installed in Eclipse when I created the Maven project per instructions on the web and installed the connector I still did not get the Scala perspective nor right clicking and being able to add Scala types. This time around, I used the Scala IDE project wizard to create a simple non-Maven app and then converted it to Maven and all features seem to work fine. I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at the same time. Kind of overkill. But part of the frustration was following along with the Maven Scala project instructions using an archetype badly out of date. So now I think I found the a good approach to getting up and running with spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 5. Add Spark dependency). Date: Wed, 29 Oct 2014 11:38:23 -0700 Subject: Re: winutils From: denny.g@gmail.com To: ronalday...@live.com CC: user@spark.apache.org QQ - did you download the Spark 1.1 binaries that included the Hadoop one? Does this happen if you're using the Spark 1.1 binaries that do not include the Hadoop jars? On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
RE: winutils
Well. I got past this problem and the manner was in my own email. I did download the one with Hadoop since it was among the only ones you don't have to compile from source along with CDH and Map. It worked yesterday because I added 1.1.0 as a maven dependency from the repository. I just did the same thing again and it worked perfect. One peculiarity I will mention is that even with Scala IDE installed in Eclipse when I created the Maven project per instructions on the web and installed the connector I still did not get the Scala perspective nor right clicking and being able to add Scala types. This time around, I used the Scala IDE project wizard to create a simple non-Maven app and then converted it to Maven and all features seem to work fine. I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at the same time. Kind of overkill. But part of the frustration was following along with the Maven Scala project instructions using an archetype badly out of date. So now I think I found the a good approach to getting up and running with spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 5. Add Spark dependency). Date: Wed, 29 Oct 2014 11:38:23 -0700 Subject: Re: winutils From: denny.g@gmail.com To: ronalday...@live.com CC: user@spark.apache.org QQ - did you download the Spark 1.1 binaries that included the Hadoop one? Does this happen if you're using the Spark 1.1 binaries that do not include the Hadoop jars? On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
Questions about serialization and SparkConf
Assume in my executor I say SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); sparkConf.set(mysparc.data, Some user Data); sparkConf.setAppName(Some App); Now 1) Are there default values set in some system file which are populated if I call new SparkConf - if not how do I get those? _ I think i see defaults foe the master, the Serializer... 2) If I set a property in SparkConf for my SparkContext will I see that property in a Slave machine? 3) If I set a property anf then call showSparkProperties() do I see that property set and if not how can I see the property set - say in another thread as in if in some other thread on the executor say as in showSparkPropertiesInAnotherThread(); 4) How can a slave machine access properties set on the executor I an really interested in sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); which needs to be used by the Slave /** * dump all spark properties to System.err */ public static void showSparkProperties() { SparkConf sparkConf = new SparkConf(); Tuple2String, String[] all = sparkConf.getAll(); for (Tuple2String, String prp : all) { System.err.println(prp._1().toString() + = + prp._2()); } } public static void showSparkPropertiesInAnotherThread() { new Thread(new Runnable() { @Override public void run() { showSparkProperties(); } }).start(); }
Re: Spark Streaming with Kinesis
I haven't tried this myself yet, but this sounds relevant: https://github.com/apache/spark/pull/2535 Will be giving this a try today or so, will report back. On Wednesday, October 29, 2014, Harold Nguyen har...@nexgate.com wrote: Hi again, After getting through several dependencies, I finally got to this non-dependency type error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V It look every similar to this post: http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi Since I'm a little new to everything, would someone be able to provide a step-by-step guidance for that ? Harold On Wed, Oct 29, 2014 at 9:22 AM, Harold Nguyen har...@nexgate.com javascript:_e(%7B%7D,'cvml','har...@nexgate.com'); wrote: Hi all, I followed the guide here: http://spark.apache.org/docs/latest/streaming-kinesis-integration.html But got this error: Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider Would you happen to know what dependency or jar is needed ? Harold
Re: Selecting Based on Nested Values using Language Integrated Query Syntax
We are working on more helpful error messages, but in the meantime let me explain how to read this output. org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'p.name,'p.age, tree: Project ['p.name,'p.age] Filter ('location.number = 2300) Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber)) Generate explode(locations#10), true, false, Some(l) LowerCaseSchema Subquery p Subquery people SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) LowerCaseSchema Subquery ln Subquery locationNames SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) 'tickedFields indicate a failure to resolve, where as numbered#10 attributes have been resolved. (The numbers are globally unique and can be used to disambiguate where a column is coming from when the names are the same) Resolution happens bottom up. So the first place that there is a problem is 'ln.streetnumber, which prevents the rest of the query from resolving. If you look at the subquery ln, it is only producing two columns: locationName and locationNumber. So streetnumber is not valid. On Tue, Oct 28, 2014 at 8:02 PM, Corey Nolet cjno...@gmail.com wrote: scala locations.queryExecution warning: there were 1 feature warning(s); re-run with -feature for details res28: _4.sqlContext.QueryExecution forSome { val _4: org.apache.spark.sql.SchemaRDD } = == Parsed Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Analyzed Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Optimized Logical Plan == SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) == Physical Plan == ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38 Code Generation: false == RDD == scala people.queryExecution warning: there were 1 feature warning(s); re-run with -feature for details res29: _5.sqlContext.QueryExecution forSome { val _5: org.apache.spark.sql.SchemaRDD } = == Parsed Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Analyzed Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Optimized Logical Plan == SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) == Physical Plan == ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38 Code Generation: false == RDD == Here's when I try executing the join and the lateral view explode() : 14/10/28 23:05:35 INFO ParseDriver: Parse Completed org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'p.name,'p.age, tree: Project ['p.name,'p.age] Filter ('location.number = 2300) Join Inner, Some((location#110.number AS number#111 = 'ln.streetnumber)) Generate explode(locations#10), true, false, Some(l) LowerCaseSchema Subquery p Subquery people SparkLogicalPlan (ExistingRdd [age#9,locations#10,name#11], MappedRDD[28] at map at JsonRDD.scala:38) LowerCaseSchema Subquery ln Subquery locationNames SparkLogicalPlan (ExistingRdd [locationName#80,locationNumber#81], MappedRDD[99] at map at JsonRDD.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at
RE: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
I am not sure about that. Can you try a Spray version built with 2.2.x along with Spark 1.1 and include the Akka dependencies in your project’s sbt file? Mohammed From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Tuesday, October 28, 2014 8:58 PM To: Mohammed Guller Cc: user Subject: Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Tuesday, October 28, 2014 3:03 AM To: user Subject: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Spark SQL - how to query dates stored as millis?
I have been searching and have not found a solution as to how one might query on dates stored as UTC milliseconds from the epoch. The schema I have pulled in from a NoSQL datasource (JSON from MongoDB) has the target date as: |-- dateCreated: struct (nullable = true) ||-- $date: long (nullable = true) and my goal is to write queries such along the lines of: SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1] Of course wrapped in the Spark specific sqlContext.sql(SELECT myStuff BLAH BLAH).collect... I am new to both Scala and Spark, so forgive me if this is an elementary question, but my searches have turned up empty. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-how-to-query-dates-stored-as-millis-tp17670.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: winutils
cf. https://issues.apache.org/jira/browse/SPARK-2356 On Wed, Oct 29, 2014 at 7:31 PM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert DStream to String
Hi all, How do I convert a DStream to a string ? For instance, I want to be able to: val myword = words.filter(word = word.startsWith(blah)) And use myword in other places, like tacking it onto (key, value) pairs, like so: val pairs = words.map(word = (myword+_+word, 1)) Thanks for any help, Harold
what does DStream.union() do?
The documentation at https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream describes the union() method as Return a new DStream by unifying data of another DStream with this DStream. Can somebody provide a clear definition of what unifying means in this context? Does it append corresponding elements together? Inside a wider tuple if need be? I'm hoping for something clear enough that it could just be added to the doc page if the developers so chose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
BUG: when running as extends App, closures don't capture variables
Greetings! This might be a documentation issue as opposed to a coding issue, in that perhaps the correct answer is don't do that, but as this is not obvious, I am writing. The following code produces output most would not expect: package misc import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ object DemoBug extends App { val conf = new SparkConf() val sc = new SparkContext(conf) val rdd = sc.parallelize(List(A,B,C,D)) val str1 = A val rslt1 = rdd.filter(x = { x != A }).count val rslt2 = rdd.filter(x = { str1 != null x != A }).count println(DemoBug: rslt1 = + rslt1 + rslt2 = + rslt2)} This produces the output:DemoBug: rslt1 = 3 rslt2 = 0 Compiled with sbt:libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 1.1.0) If instead there is a proper main(), it works as expected. Thank you. Sincerely, Mike
Re: what does DStream.union() do?
The union function simply returns a DStream with the elements from both. This is the same behavior as when we call union on RDDs :) (You can think of union as similar to the union operator on sets except without the unique element restrictions). On Wed, Oct 29, 2014 at 3:15 PM, spr s...@yarcdata.com wrote: The documentation at https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream describes the union() method as Return a new DStream by unifying data of another DStream with this DStream. Can somebody provide a clear definition of what unifying means in this context? Does it append corresponding elements together? Inside a wider tuple if need be? I'm hoping for something clear enough that it could just be added to the doc page if the developers so chose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Cell : 425-233-8271
how to extract/combine elements of an Array in DStream element?
I am processing a log file, from each line of which I want to extract the zeroth and 4th elements (and an integer 1 for counting) into a tuple. I had hoped to be able to index the Array for elements 0 and 4, but Arrays appear not to support vector indexing. I'm not finding a way to extract and combine the elements properly, perhaps due to being a SparkStreaming/Scala newbie. My code so far looks like: 1]var lines = ssc.textFileStream(dirArg) 2]var linesArray = lines.map( line = (line.split(\t))) 3]var respH = linesArray.map( lineArray = lineArray(4) ) 4a] var time = linesArray.map( lineArray = lineArray(0) ) 4b] var time = linesArray.map( lineArray = (lineArray(0), 1)) 5]var newState = respH.union(time) If I use line 4a and not 4b, it compiles properly. (I still have issues getting my update function to updateStateByKey working, so don't know if it _works_ properly.) If I use line 4b and not 4a, it fails at compile time with [error] foo.scala:82: type mismatch; [error] found : org.apache.spark.streaming.dstream.DStream[(String, Int)] [error] required: org.apache.spark.streaming.dstream.DStream[String] [error] var newState = respH.union(time) This implies that the DStreams being union()ed have to be of identical per-element type. Can anyone confirm that's true? If so, is there a way to extract the needed elements and build the new DStream? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-extract-combine-elements-of-an-Array-in-DStream-element-tp17676.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to extract/combine elements of an Array in DStream element?
On Wed, Oct 29, 2014 at 3:29 PM, spr s...@yarcdata.com wrote: I am processing a log file, from each line of which I want to extract the zeroth and 4th elements (and an integer 1 for counting) into a tuple. I had hoped to be able to index the Array for elements 0 and 4, but Arrays appear not to support vector indexing. I'm not finding a way to extract and combine the elements properly, perhaps due to being a SparkStreaming/Scala newbie. My code so far looks like: 1]var lines = ssc.textFileStream(dirArg) 2]var linesArray = lines.map( line = (line.split(\t))) 3]var respH = linesArray.map( lineArray = lineArray(4) ) 4a] var time = linesArray.map( lineArray = lineArray(0) ) 4b] var time = linesArray.map( lineArray = (lineArray(0), 1)) 5]var newState = respH.union(time) If I use line 4a and not 4b, it compiles properly. (I still have issues getting my update function to updateStateByKey working, so don't know if it _works_ properly.) If I use line 4b and not 4a, it fails at compile time with [error] foo.scala:82: type mismatch; [error] found : org.apache.spark.streaming.dstream.DStream[(String, Int)] [error] required: org.apache.spark.streaming.dstream.DStream[String] [error] var newState = respH.union(time) This implies that the DStreams being union()ed have to be of identical per-element type. Can anyone confirm that's true? Yes. As shown in the scaladoc/javadoc they have to be the same type. ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream ) If so, is there a way to extract the needed elements and build the new DStream? Maybe you can say what you want your new DStream to look like? If you just want to extract the zero and fourth elements and have them together I'd do the extraction in a single map e.g. something like val iLikeCoffeeDStream = linesArrayDStream.map(lineArray = (lineArray(0), lineArray(4)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-extract-combine-elements-of-an-Array-in-DStream-element-tp17676.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Cell : 425-233-8271
Spark related meet up on Nov 6th in SF
Hi all, We’re organizing a meet up on Nov 6th in our office downtown SF that might be of interest to the Spark community. We will be discussing our experience building our first production Spark based application. More details and sign up info here: https://www.eventbrite.com/e/from-hadoop-to-spark-in-4-months-lessons-learned-tickets-13681302143 Thanks, Alexis
Re: Convert DStream to String
What would it mean to make a DStream into a String? it's inherently a sequence of things over time, each of which might be a string but which are usually RDDs of things. On Wed, Oct 29, 2014 at 11:15 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, How do I convert a DStream to a string ? For instance, I want to be able to: val myword = words.filter(word = word.startsWith(blah)) And use myword in other places, like tacking it onto (key, value) pairs, like so: val pairs = words.map(word = (myword+_+word, 1)) Thanks for any help, Harold - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what does DStream.union() do?
I need more precision to understand. If the elements of one DStream/RDD are (String) and the elements of the other are (Time, Int), what does union mean? I'm hoping for (String, Time, Int) but that appears optimistic. :) Do the elements have to be of homogeneous type? Holden Karau wrote The union function simply returns a DStream with the elements from both. This is the same behavior as when we call union on RDDs :) (You can think of union as similar to the union operator on sets except without the unique element restrictions). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673p17682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert DStream to String
Hi Sean, I'd just like to take the first word of every line, and use it as a variable for later. Is there a way to do that? Here's the gist of what I want to do: val lines = KafkaUtils.createStream(ssc, localhost:2181, test, Map(test - 10)).map(_._2) val words = lines.flatMap(_.split( )) val acct = words.filter(word = word.startsWith(SECRETWORD)) val pairs = words.map(word = (acct+_+word, 1)) Take all lines coming into Kafka, and add the word 'acct' to each word. As an example, here is a line: hello world you are SECRETWORDthebest hello world And it should do this: (SECRETWORDthebest_hello, 2), (SECRETWORDthebest_world, 2), (SECRETWORDthebest_you, 1), etc... Harold On Wed, Oct 29, 2014 at 3:36 PM, Sean Owen so...@cloudera.com wrote: What would it mean to make a DStream into a String? it's inherently a sequence of things over time, each of which might be a string but which are usually RDDs of things. On Wed, Oct 29, 2014 at 11:15 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, How do I convert a DStream to a string ? For instance, I want to be able to: val myword = words.filter(word = word.startsWith(blah)) And use myword in other places, like tacking it onto (key, value) pairs, like so: val pairs = words.map(word = (myword+_+word, 1)) Thanks for any help, Harold
Re: what does DStream.union() do?
On Wed, Oct 29, 2014 at 3:39 PM, spr s...@yarcdata.com wrote: I need more precision to understand. If the elements of one DStream/RDD are (String) and the elements of the other are (Time, Int), what does union mean? I'm hoping for (String, Time, Int) but that appears optimistic. :) It won't compile. Do the elements have to be of homogeneous type? Yes. From the scaladoc ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream ) you can see DStreams are generic/templated on a type (T) and the union function works on a DStream of the same templated type. If you have hetrogeneous data you can first map each DStream it to a case class with options or try something like http://stackoverflow.com/questions/3508077/does-scala-have-type-disjunction-union-types Holden Karau wrote The union function simply returns a DStream with the elements from both. This is the same behavior as when we call union on RDDs :) (You can think of union as similar to the union operator on sets except without the unique element restrictions). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673p17682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Cell : 425-233-8271
Re: BUG: when running as extends App, closures don't capture variables
Good catch! If you'd like, you can send a pull request changing the files in docs/ to do this (see https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark), otherwise maybe open an issue on https://issues.apache.org/jira/browse/SPARK https://issues.apache.org/jira/browse/SPARK so we can track it. Matei On Oct 29, 2014, at 3:16 PM, Michael Albert m_albert...@yahoo.com.INVALID wrote: Greetings! This might be a documentation issue as opposed to a coding issue, in that perhaps the correct answer is don't do that, but as this is not obvious, I am writing. The following code produces output most would not expect: package misc import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object DemoBug extends App { val conf = new SparkConf() val sc = new SparkContext(conf) val rdd = sc.parallelize(List(A,B,C,D)) val str1 = A val rslt1 = rdd.filter(x = { x != A }).count val rslt2 = rdd.filter(x = { str1 != null x != A }).count println(DemoBug: rslt1 = + rslt1 + rslt2 = + rslt2) } This produces the output: DemoBug: rslt1 = 3 rslt2 = 0 Compiled with sbt: libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0 Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 1.1.0) If instead there is a proper main(), it works as expected. Thank you. Sincerely, Mike
Re: Convert DStream to String
Sure, that code looks like it does sort of what you describe but it's mixed up in a few ways. It looks like you only want to operate on words that start with SECRETWORD, but then you are prepending acct and _ in the code but expecting something appending in the result. You also seem like you want to sum by key so there needs to be a reduceByKeyAndWindow in here somewhere, or else a foreachRDD and reduceByKey. The result is not a sequence of (word,count), but a sequence of RDDs of (word,count). On Wed, Oct 29, 2014 at 11:40 PM, Harold Nguyen har...@nexgate.com wrote: Hi Sean, I'd just like to take the first word of every line, and use it as a variable for later. Is there a way to do that? Here's the gist of what I want to do: val lines = KafkaUtils.createStream(ssc, localhost:2181, test, Map(test - 10)).map(_._2) val words = lines.flatMap(_.split( )) val acct = words.filter(word = word.startsWith(SECRETWORD)) val pairs = words.map(word = (acct+_+word, 1)) Take all lines coming into Kafka, and add the word 'acct' to each word. As an example, here is a line: hello world you are SECRETWORDthebest hello world And it should do this: (SECRETWORDthebest_hello, 2), (SECRETWORDthebest_world, 2), (SECRETWORDthebest_you, 1), etc... Harold On Wed, Oct 29, 2014 at 3:36 PM, Sean Owen so...@cloudera.com wrote: What would it mean to make a DStream into a String? it's inherently a sequence of things over time, each of which might be a string but which are usually RDDs of things. On Wed, Oct 29, 2014 at 11:15 PM, Harold Nguyen har...@nexgate.com wrote: Hi all, How do I convert a DStream to a string ? For instance, I want to be able to: val myword = words.filter(word = word.startsWith(blah)) And use myword in other places, like tacking it onto (key, value) pairs, like so: val pairs = words.map(word = (myword+_+word, 1)) Thanks for any help, Harold - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark with HLists
I tried using shapeless HLists as data storage for data inside spark. Unsurprisingly, it failed. The deserialization isn't well-defined because of all the implicits used by shapeless. How could I make it work? Sample Code: /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import shapeless._ import ops.hlist._ object SimpleApp { def main(args: Array[String]) { val logFile = /tmp/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData .map(line = line :: HNil) .filter(_.select[String].contains(a)) .count() println(Lines with a: %s.format(numAs)) } } Error: Exception in thread main java.lang.NoClassDefFoundError: shapeless/$colon$colon at SimpleApp$.main(SimpleApp.scala:15) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How does custom partitioning in PySpark work?
I want several RDDs (which are the result of my program's operations on existing RDDs) to match the partitioning of an existing RDD, since they will be joined together in the end. Do I understand correctly that I would benefit from using a custom partitioner that would be applied to all RDDs? Secondly, how do I accomplish this in PySpark? The docs barely mention it, and the only thing I could find was: /partitionBy(self, numPartitions, partitionFunc=portable_hash)/ What is this partitionFunc, and how do I use it to create something like HashPartitioner that I can re-use for multiple RDDs? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-custom-partitioning-in-PySpark-work-tp17688.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark with HLists
looks like a misssing class issue? what makes you think its serialization? shapeless does indeed have a lot of helper classes that get sucked in and are not serializable. see here: https://groups.google.com/forum/#!topic/shapeless-dev/05_DXnoVnI4 and for a project that uses shapeless in spark see here: https://github.com/tresata/spark-columnar On Wed, Oct 29, 2014 at 7:05 PM, Simon Hafner reactorm...@gmail.com wrote: I tried using shapeless HLists as data storage for data inside spark. Unsurprisingly, it failed. The deserialization isn't well-defined because of all the implicits used by shapeless. How could I make it work? Sample Code: /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import shapeless._ import ops.hlist._ object SimpleApp { def main(args: Array[String]) { val logFile = /tmp/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData .map(line = line :: HNil) .filter(_.select[String].contains(a)) .count() println(Lines with a: %s.format(numAs)) } } Error: Exception in thread main java.lang.NoClassDefFoundError: shapeless/$colon$colon at SimpleApp$.main(SimpleApp.scala:15) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
use additional ebs volumes for hsdf storage with spark-ec2
I started my ec2 spark cluster with ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10 launch mycluster I see the additional volumes attached but they do not seem to be set up for hdfs. How can I check if they are being utilized on all workers, and how can I get all workers to utilize the extra volumes for hdfs. I do not have experience using hadoop directly, only through spark. thanks Daniel
SparkSQL: Nested Query error
Hi, I am using Spark 1.1.0. I have the following SQL statement where I am trying to count the number of UIDs that are in the tusers table but not in the device table. val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) I am getting the following error: Exception in thread main java.lang.RuntimeException: [1.61] failure: string literal expected SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device) I am not sure if every subquery has to be a string, so I tried to enclose the subquery as a string literal as follows: val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) But that resulted in a compilation error. What is the right way to frame the above query in Spark SQL? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
jira created with comments/references to this discussion: https://issues.apache.org/jira/browse/SPARK-4144 On Tue, Aug 19, 2014 at 4:47 PM, Xiangrui Meng men...@gmail.com wrote: No. Please create one but it won't be able to catch the v1.1 train. -Xiangrui On Tue, Aug 19, 2014 at 4:22 PM, Chris Fregly ch...@fregly.com wrote: this would be awesome. did a jira get created for this? I searched, but didn't find one. thanks! -chris On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Thanks a lot Xiangrui. This will help. On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote: Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Submitting Spark job on Unix cluster from dev environment (Windows)
Thanks by setting driver host to Windows and specifying some ports (like driver, fileserver, broadcast etc..) it worked perfectly. I need to specify those ports as not all ports are open on my machine. For, driver host name, I was assuming Spark should get it, as in case of linux we are not setting it. I was thinking its usable only in case we want to set driver host other than the machine from which we are running the program. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-tp16989p17693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task Size Increases when using loops
Hi,I'm new to spark, and am facing a peculiar problem. I'm writing a simple Java Driver program where i'm creating Key / Value data structure and collecting them, once created. The problem i'm facing is that, when i increase the iterations of a for loop which creates the ArrayList of Long Values which i have to put into the Key / Value data structure and save in Spark as a Java Collection, the serialized size of tasks also increases proportionately. e.g: for Loop count : 10Task Size : 1120 bytes for Loop Count : 1 Task Size : 33402 bytesfor Loop Count : 1000 Task Size : 453434 bytes etc. I'm not able to understand why Task size increases, i tried to run the same example via Spark Shell, and i noticed the Task size remains the same, irrespective of the loop iteration count. Code : @Override public void execute() { // do something List numbers = new ArrayList(); JavaRDD distData = null; JavaPairRDDString, Long mapOfKeys = null; JavaRDD keysRDD = null; class ByKeyImpl implements FunctionLong, String, Serializable { /** * */ private static final long serialVersionUID = 5749098182016143296L; public String call(Long paramT1) throws Exception { // TODO Auto-generated method stub StringBuilder builder = new StringBuilder(); builder.append(paramT1).append(',').append(paramT1 + 1); return builder.toString(); } } System.out.println( ** STARTING BENCHMARK EXAMPLE ...*); while(true) { System.out.println( ** DO YOU WANT TO CONTINUE ? (YES/NO) *); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { String continueString = reader.readLine(); if(yes.equalsIgnoreCase(continueString)) { if( numbers.size() == 0 ) { // List not populated for (long i = 0; i num; i++) { numbers.add(i); } } // at this time numbers has long values in it. // check for RDD if already created or not. if( distData == null) { System.out.println( NEW RDD CREATED.); if ( numPartitions 0) { distData = sc.parallelize(numbers,numPartitions) ; } else {distData = sc.parallelize(numbers) ; } } // at this time, RDD is already present or newly created// check if map is null or not if(mapOfKeys == null) { mapOfKeys = distData .keyBy(new ByKeyImpl()); keysRDD = mapOfKeys.keys(); keysRDD.persist(StorageLevel.MEMORY_ONLY());
GC Issues with randomSplit on large dataset
Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944) at org.apache.spark.rdd.RDD.init(RDD.scala:1227) at org.apache.spark.rdd.RDD.init(RDD.scala:83) at org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD.randomSplit(RDD.scala:379) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
spark-submit results in NoClassDefFoundError
Hi, I am trying to get my Spark application to run on YARN and by now I have managed to build a fat jar as described on http://markmail.org/message/c6no2nyaqjdujnkq (which is the only really usable manual on how to get such a jar file). My code runs fine using sbt test and sbt run, but when running ~/spark-1.1.0-bin-hadoop2.4/bin/spark-submit \ --class my.spark.MyClass --master local[3] \ target/scala-2.10/myclass-assembly-1.0.jar I get: Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoClassDefFoundError: com/typesafe/scalalogging/slf4j/Logger at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2615) at java.lang.Class.getMethod0(Class.java:2856) at java.lang.Class.getMethod(Class.java:1668) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.typesafe.scalalogging.slf4j.Logger at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 7 more ABRT problem creation: 'success' It seems to run into an error before it does anything with my jar? I am using com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2 instead of com.typesafe %% scalalogging-slf4j% 1.1.0 in my SBT file, could that be a reason? Thanks Tobias
Re: spark-submit results in NoClassDefFoundError
Hi again, On Thu, Oct 30, 2014 at 11:50 AM, Tobias Pfeiffer t...@preferred.jp wrote: Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoClassDefFoundError: com/typesafe/scalalogging/slf4j/Logger It turned out scalalogging was not included in the fat jar due to https://github.com/sbt/sbt-assembly/issues/116. I am using com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2 instead of com.typesafe %% scalalogging-slf4j% 1.1.0 in my SBT file, could that be a reason? So yes, that was the reason, in a way... however, I decided to include scala in the fat jar instead of modifying all my logging code... Tobias
Re: Questions about serialization and SparkConf
Hello Steve . 1) When you call new SparkConf you should get an object with the default config values. You can reference the spark configuration and tuning pages for details on what those are. 2) Yes. Properties set in this configuration will be pushed down to worker nodes actually executing the spark job. The way this is done is through the instance of a SparkContext which accepts the SparkConf as a parameter. This shared config is what will be used by all RDDs and processes spawned as a function of this context. E.g. when creating a new RDD with sc.parallelize() or reading a text file in with sc.textFile() . I think that to address 3-4 you should reason in terms of the SparkContext. In short, you shouldn't need to worry about explicitly controlling what is happening on the slave nodes. Spark should abstract away that layer so that you can write parallelizable code that the resource manager i.e. YARN pushes out to your cluster. On Oct 29, 2014 2:58 PM, Steve Lewis lordjoe2...@gmail.com wrote: Assume in my executor I say SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); sparkConf.set(mysparc.data, Some user Data); sparkConf.setAppName(Some App); Now 1) Are there default values set in some system file which are populated if I call new SparkConf - if not how do I get those? _ I think i see defaults foe the master, the Serializer... 2) If I set a property in SparkConf for my SparkContext will I see that property in a Slave machine? 3) If I set a property anf then call showSparkProperties() do I see that property set and if not how can I see the property set - say in another thread as in if in some other thread on the executor say as in showSparkPropertiesInAnotherThread(); 4) How can a slave machine access properties set on the executor I an really interested in sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); which needs to be used by the Slave /** * dump all spark properties to System.err */ public static void showSparkProperties() { SparkConf sparkConf = new SparkConf(); Tuple2String, String[] all = sparkConf.getAll(); for (Tuple2String, String prp : all) { System.err.println(prp._1().toString() + = + prp._2()); } } public static void showSparkPropertiesInAnotherThread() { new Thread(new Runnable() { @Override public void run() { showSparkProperties(); } }).start(); }
RE: problem with start-slaves.sh
hi Yana, in my case I did not start any spark worker. However, shark was definitely running. Do you think that might be a problem? I will take a look Thank you, From: Yana Kadiyska [yana.kadiy...@gmail.com] Sent: Wednesday, October 29, 2014 9:45 AM To: Pagliari, Roberto Cc: user@spark.apache.org Subject: Re: problem with start-slaves.sh I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
Re: SparkSQL: Nested Query error
You may use - select count(u_uid) from tusers a left outer join device b on (a.u_uid = b.d_uid) where b.d_uid is null On Wed, Oct 29, 2014 at 5:45 PM, SK skrishna...@gmail.com wrote: Hi, I am using Spark 1.1.0. I have the following SQL statement where I am trying to count the number of UIDs that are in the tusers table but not in the device table. val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) I am getting the following error: Exception in thread main java.lang.RuntimeException: [1.61] failure: string literal expected SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device) I am not sure if every subquery has to be a string, so I tried to enclose the subquery as a string literal as follows: val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) But that resulted in a compilation error. What is the right way to frame the above query in Spark SQL? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Algebird using spark-shell
I'm running into this error when I attempt to launch spark-shell passing in the algebird-core jar: ~~ $ ./bin/spark-shell --jars algebird-core_2.9.2-0.1.11.jar scala import com.twitter.algebird._ import com.twitter.algebird._ scala import HyperLogLog._ import HyperLogLog._ scala import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLogMonoid scala val hll = new HyperLogLogMonoid(12) java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at com.twitter.algebird.HyperLogLogMonoid.init(HyperLogLog.scala:309) Is that the right jar to pass? Are there any non-streaming examples? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib: libsvm - default value initialization
Hi All,I have my sparse data in libsvm format. val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt) I am running Linear regression. Let us say that my data has following entry:1 1:0 4:1 I think it will assume 0 for indices 2 and 3, right? I would like to make default values to be 0.5 instead of 0. Is it possible? If not, I will have to switch to dense data and it will significantly increase the data size for me.
Re: Java api overhead?
Thanks Koert. These numbers indeed tie back to our data and algorithms. Would going the scala route save some memory, as the java API creates wrapper Tuple2 for all pair functions? On Wednesday, October 29, 2014, Koert Kuipers ko...@tresata.com wrote: since spark holds data structures on heap (and by default tries to work with all data in memory) and its written in Scala seeing lots of scala Tuple2 is not unexpected. how do these numbers relate to your data size? On Oct 27, 2014 2:26 PM, Sonal Goyal sonalgoy...@gmail.com javascript:_e(%7B%7D,'cvml','sonalgoy...@gmail.com'); wrote: Hi, I wanted to understand what kind of memory overheads are expected if at all while using the Java API. My application seems to have a lot of live Tuple2 instances and I am hitting a lot of gc so I am wondering if I am doing something fundamentally wrong. Here is what the top of my heap looks like. I actually create reifier.tuple.Tuple objects and pass them to map methods and mostly return Tuple2Tuple,Tuple. The heap seems to have far too many Tuple2 and $colon$colon. num #instances #bytes class name -- 1: 85414872 2049956928 scala.collection.immutable.$colon$colon 2: 85414852 2049956448 scala.Tuple2 3:304221 14765832 [C 4:3029237270152 java.lang.String 5: 441112624624 [Ljava.lang.Object; 6: 12101495256 [B 7: 39839 956136 java.util.ArrayList 8:29 950736 [Lscala.concurrent.forkjoin.ForkJoinTask; 9: 8129 827792 java.lang.Class 10: 33839 812136 java.lang.Long 11: 33400 801600 reifier.tuple.Tuple 12: 6116 538208 java.lang.reflect.Method 13: 12767 408544 java.util.concurrent.ConcurrentHashMap$Node 14: 5994 383616 org.apache.spark.scheduler.ResultTask 15: 10298 329536 java.util.HashMap$Node 16: 11988 287712 org.apache.spark.rdd.NarrowCoGroupSplitDep 17: 5708 228320 reifier.block.Canopy 18: 9 215784 [Lscala.collection.Seq; 19: 12078 193248 java.lang.Integer 20: 12019 192304 java.lang.Object 21: 5708 182656 reifier.block.Tree 22: 2776 173152 [I 23: 6013 144312 scala.collection.mutable.ArrayBuffer 24: 5994 143856 [Lorg.apache.spark.rdd.CoGroupSplitDep; 25: 5994 143856 org.apache.spark.rdd.CoGroupPartition 26: 5994 143856 org.apache.spark.rdd.ShuffledRDDPartition 27: 4486 143552 java.util.Hashtable$Entry 28: 6284 132800 [Ljava.lang.Class; 29: 1819 130968 java.lang.reflect.Field 30: 605 101208 [Ljava.util.HashMap$Node; Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal -- Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal
Re: Spark Worker node accessing Hive metastore
Thanks Akhil. So the worker spark node doesn't need access to metastore to run Hive queries? If yes, which component accesses the metastore? For Hive, the Hive-cli accesses the metastore before submitting M/R jobs. Thanks, Ken -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-node-accessing-Hive-metastore-tp17255p17704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org