Re: A question about streaming throughput
Hm, is this not just showing that you're rate-limited by how fast you can get events to the cluster? you have more network bottleneck between the data source and cluster in the cloud than your local cluster. On Tue, Oct 14, 2014 at 9:44 PM, danilopds danilob...@gmail.com wrote: Hi, I'm learning about Apache Spark Streaming and I'm doing some tests. Now, I have a modified version of the app NetworkWordCount that perform a /reduceByKeyAndWindow/ with window of 10 seconds in intervals of 5 seconds. I'm using also the function to measure the rate of records/second like this: /words.foreachRDD(rdd = { val count = rdd.count() println(Current rate: + (count/1) + records/second) })/ Then, In my computer with 4 cores and 8gb (running: /local[4]/) I have this average result: Current rate: 130 000 Running locally with my computer as /master and worker/ I have this: Current rate: 25 000 And running in a cloud computing azure with 4 cores and 7 gb, the result is: Current rate: 10 000 I read the Spark Streaming paper http://www.eecs.berkeley.edu/~matei/papers/2013/sosp_spark_streaming.pdf and the performance evaluation to a similar application was 250 000 records/second. To send data in the socket I'm using an application similar to this: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-td3431.html#a13814 So, Can anyone suggest me something to improve these rate? /(I increased the memory in executor and I didn't have better results)/ Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/A-question-about-streaming-throughput-tp16416.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Empty DStream / RDD and reduceByKey
The problem is not ReduceWords, since it is already Serializable by implementing Function2. Indeed the error tells you just what is unserializable: KafkaStreamingWordCount, your driver class. Something is causing a reference to the containing class to be serialized in the closure. The best fix is to not do this. Usually the culprit is an inner class, possibly anonymous, that is non-static. These contain a hidden reference to the containing class, through which you may be referring to one of its members. If not, it's still possible the closure cleaner isn't removing the reference even though it could. Is ReduceWords actually an inner class? Or on another tangent, when you remove reduceByKey, you are also removing print? that would cause it to do nothing, which of course generates no error. On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob abe.jac...@gmail.com wrote: Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup - I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); return 0; If I remove the code (highlighted) JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords());, the application works just fine... The moment I introduce the reduceBykey, I start getting the following error and spark streaming shuts down - 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job 1413323925000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) . . 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at
adding element into MutableList throws an error type mismatch
Hi All, Could someone shed a light to why when adding element into MutableList can result in type mistmatch, even if I'm sure that the class type is right? Below is the sample code I run in spark 1.0.2 console, at the end of line, there is an error type mismatch: Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_45) Type in expressions to have them evaluated. Type :help for more information. 14/10/15 14:36:39 INFO spark.SecurityManager: Changing view acls to: hadoop 14/10/15 14:36:39 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop) 14/10/15 14:36:39 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/15 14:36:39 INFO Remoting: Starting remoting 14/10/15 14:36:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@fphd4.ctpilot1.com:35293] 14/10/15 14:36:39 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@fphd4.ctpilot1.com:35293] 14/10/15 14:36:39 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/15 14:36:39 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/15 14:36:39 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141015143639-c62e 14/10/15 14:36:39 INFO storage.MemoryStore: MemoryStore started with capacity 294.4 MB. 14/10/15 14:36:39 INFO network.ConnectionManager: Bound socket to port 43236 with id = ConnectionManagerId(fphd4.ctpilot1.com,43236) 14/10/15 14:36:39 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/15 14:36:39 INFO storage.BlockManagerInfo: Registering block manager fphd4.ctpilot1.com:43236 with 294.4 MB RAM 14/10/15 14:36:39 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/15 14:36:39 INFO spark.HttpServer: Starting HTTP Server 14/10/15 14:36:39 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37164 14/10/15 14:36:40 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.18.30.154:37164 14/10/15 14:36:40 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-34fc70ab-7c5d-4e79-9ae7-929fd47d4f36 14/10/15 14:36:40 INFO spark.HttpServer: Starting HTTP Server 14/10/15 14:36:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:47025 14/10/15 14:36:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/15 14:36:40 INFO ui.SparkUI: Started SparkUI at http://fphd4.ctpilot1.com:4040 14/10/15 14:36:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/10/15 14:36:40 INFO executor.Executor: Using REPL class URI: http://10.18.30.154:49669 14/10/15 14:36:40 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. scala case class Dummy(x: String) { | val data:String = x | } defined class Dummy scala import scala.collection.mutable.MutableList import scala.collection.mutable.MutableList scala val v = MutableList[Dummy]() v: scala.collection.mutable.MutableList[Dummy] = MutableList() scala v += (new Dummy(a)) console:16: error: type mismatch; found : Dummy required: Dummy v += (new Dummy(a)) The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: adding element into MutableList throws an error type mismatch
Another instance of https://issues.apache.org/jira/browse/SPARK-1199 , fixed in subsequent versions. On Wed, Oct 15, 2014 at 7:40 AM, Henry Hung ythu...@winbond.com wrote: Hi All, Could someone shed a light to why when adding element into MutableList can result in type mistmatch, even if I’m sure that the class type is right? Below is the sample code I run in spark 1.0.2 console, at the end of line, there is an error type mismatch: Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_45) Type in expressions to have them evaluated. Type :help for more information. 14/10/15 14:36:39 INFO spark.SecurityManager: Changing view acls to: hadoop 14/10/15 14:36:39 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop) 14/10/15 14:36:39 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/15 14:36:39 INFO Remoting: Starting remoting 14/10/15 14:36:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@fphd4.ctpilot1.com:35293] 14/10/15 14:36:39 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@fphd4.ctpilot1.com:35293] 14/10/15 14:36:39 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/15 14:36:39 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/15 14:36:39 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141015143639-c62e 14/10/15 14:36:39 INFO storage.MemoryStore: MemoryStore started with capacity 294.4 MB. 14/10/15 14:36:39 INFO network.ConnectionManager: Bound socket to port 43236 with id = ConnectionManagerId(fphd4.ctpilot1.com,43236) 14/10/15 14:36:39 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/15 14:36:39 INFO storage.BlockManagerInfo: Registering block manager fphd4.ctpilot1.com:43236 with 294.4 MB RAM 14/10/15 14:36:39 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/15 14:36:39 INFO spark.HttpServer: Starting HTTP Server 14/10/15 14:36:39 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37164 14/10/15 14:36:40 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.18.30.154:37164 14/10/15 14:36:40 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-34fc70ab-7c5d-4e79-9ae7-929fd47d4f36 14/10/15 14:36:40 INFO spark.HttpServer: Starting HTTP Server 14/10/15 14:36:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:47025 14/10/15 14:36:40 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/15 14:36:40 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/15 14:36:40 INFO ui.SparkUI: Started SparkUI at http://fphd4.ctpilot1.com:4040 14/10/15 14:36:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/10/15 14:36:40 INFO executor.Executor: Using REPL class URI: http://10.18.30.154:49669 14/10/15 14:36:40 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. scala case class Dummy(x: String) { | val data:String = x | } defined class Dummy scala import scala.collection.mutable.MutableList import scala.collection.mutable.MutableList scala val v = MutableList[Dummy]() v: scala.collection.mutable.MutableList[Dummy] = MutableList() scala v += (new Dummy(a)) console:16: error: type mismatch; found : Dummy required: Dummy v += (new Dummy(a)) The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Initial job has not accepted any resources when launching SparkPi example on a worker.
Can anyone help me, please? 在 10/14/2014 9:58 PM, Theodore Si 写道: Hi all, I have two nodes, one as master(*host1*) and the other as worker(*host2*). I am using the standalone mode. After starting the master on host1, I run $ export MASTER=spark://host1:7077 $ bin/run-example SparkPi 10 on host2, but I get this: 14/10/14 21:54:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory And it repeats again and again. How can I fix this? Best Regards Theo
Re: Default spark.deploy.recoveryMode
[Removing dev lists] You are absolutely correct about that. Prashant Sharma On Tue, Oct 14, 2014 at 5:03 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi Spark users/experts, In Spark source code (Master.scala Worker.scala), when registering the worker with master, I see the usage of *persistenceEngine*. When we don't specify spark.deploy.recovery mode explicitly, what is the default value used ? This recovery mode is used to persists and restore the application worker details. I see when recovery mode not specified explicitly, *BlackHolePersistenceEngine* being used. Am i right ? Thanks, Padma Ch
Re: How to create Track per vehicle using spark RDD
You say you reduceByKey but are you really collecting all the tuples for a vehicle in a collection, like what groupByKey does already? Yes, if one vehicle has a huge amount of data that could fail. Otherwise perhaps you are simply not increasing memory from the default. Maybe you can consider using something like vehicle and *day* as a key. This would make you process each day of data separately, but if that's fine for you, might drastically cut down the data associated to a single key. Spark Streaming has a windowing function, and there is a window function for an entire RDD, but I am not sure if there is support for a 'window by key' anywhere. You can perhaps get your direct approach of collecting events working with some of the changes above. Otherwise I think you have to roll your own to some extent, creating the overlapping buckets of data, which will mean mapping the data to several copies of itself. This might still be quite feasible depending on how big a lag you are thinking of. PS for the interested, this is what LAG is: http://www.oracle-base.com/articles/misc/lag-lead-analytic-functions.php#lag On Wed, Oct 15, 2014 at 1:37 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have an RDD containing Vehicle Number , timestamp, Position. I want to get the lag function equivalent to my RDD to be able to create track segment of each Vehicle. Any help? PS: I have tried reduceByKey and then splitting the List of position in tuples. For me it runs out of memory every time because of the volume of data. ...Manas For some reason I have never got any reply to my emails to the user group. I am hoping to break that trend this time. :) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: submitted uber-jar not seeing spark-assembly.jar at worker
How did you recompile and deploy Spark to your cluster? it sounds like a problem with not getting the assembly deployed correctly, rather than your app. On Tue, Oct 14, 2014 at 10:35 PM, Tamas Sandor tsan...@gmail.com wrote: Hi, I'm rookie in spark, but hope someone can help me out. I'm writing an app that I'm submitting to my spark-master that has a worker on a separate node. It uses spark-cassandra-connector, and since it depends on guava-v16 and it conflicts with the default spark-1.1.0-assembly's guava-v14.1 I built the latest from spark git master (it was fixed in late Sept), so now I have a working spark-assembly-1.2.0-SNAPSHOT-hadoop2.4.0 running. I have my uber-jar that has hadoop-client and spark-assembly as scope:provided, excluded from the deployed jar and than it gets submitted to a spark-master from the node. From the logs I see taskSetManager throws me an error coming from my worker node saying java.lang.NoClassDefFoundError:org/apache/spark/Partition - I guess valid since my jar has no spark deps inline (uber) but why it cannot see the workers classpath - this what a provided scope would mean here? How can I fix that? Am I missing something obvious? Thank you for your help. Regards, Tamas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: system.out.println with --master yarn-cluster
Examine the output (replace $YARN_APP_ID in the following with the application identifier output by the previous command) (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.) $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_01/stdout. Regards, Vishnu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/system-out-println-with-master-yarn-cluster-tp16370p16473.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark can't find jars
Hi Jimmy, Did you try my patch? The problem on my side was that the hadoop.tmp.dir (in hadoop core-site.xml) was not handled properly by Spark when it is set on multiple partitions/disks, i.e.: property namehadoop.tmp.dir/name valuefile:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local/value /property Hence, you won't be hit by this bug if your hadoop.tmp.dir is set on one partition only. If your hadoop.tmp.dir is also set on several partitions, I agree that it looks like a bug in Spark. Christophe. On 14/10/2014 18:50, Jimmy McErlain wrote: So the only way that I could make this work was to build a fat jar file as suggested earlier. To me (and I am no expert) it seems like this is a bug. Everything was working for me prior to our upgrade to Spark 1.1 on Hadoop 2.2 but now it seems to not... ie packaging my jars locally then pushing them out to the cluster and pointing them to corresponding dependent jars Sorry I cannot be more help! J [https://mailfoogae.appspot.com/t?sender=aamltbXlAc2VsbHBvaW50cy5jb20%3Dtype=zerocontentguid=c1a21a6a-dbf9-453d-8c2a-b5e6a8d5ca56]ᐧ JIMMY MCERLAIN DATA SCIENTIST (NERD) . . . . . . . . . . . . . . . . . . [http://assetsw.sellpoint.net/IA/creative_services/logo_2014/sellpoints_logo_black_transparent_170x81.png] IF WE CAN’T DOUBLE YOUR SALES, ONE OF US IS IN THE WRONG BUSINESS. E: ji...@sellpoints.commailto:ji...@sellpoints.com M: 510.303.7751 On Tue, Oct 14, 2014 at 4:59 AM, Christophe Préaud christophe.pre...@kelkoo.commailto:christophe.pre...@kelkoo.com wrote: Hello, I have already posted a message with the exact same problem, and proposed a patch (the subject is Application failure in yarn-cluster mode). Can you test it, and see if it works for you? I would be glad too if someone can confirm that it is a bug in Spark 1.1.0. Regards, Christophe. On 14/10/2014 03:15, Jimmy McErlain wrote: BTW this has always worked for me before until we upgraded the cluster to Spark 1.1.1... J [https://mailfoogae.appspot.com/t?sender=aamltbXlAc2VsbHBvaW50cy5jb20%3Dtype=zerocontentguid=92430839-642b-4921-8d42-f266e48bcdfe]ᐧ JIMMY MCERLAIN DATA SCIENTIST (NERD) . . . . . . . . . . . . . . . . . . [http://assetsw.sellpoint.net/IA/creative_services/logo_2014/sellpoints_logo_black_transparent_170x81.png] IF WE CAN’T DOUBLE YOUR SALES, ONE OF US IS IN THE WRONG BUSINESS. E: ji...@sellpoints.commailto:ji...@sellpoints.com M: 510.303.7751tel:510.303.7751 On Mon, Oct 13, 2014 at 5:39 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.commailto:aharipriy...@gmail.com wrote: Helo, Can you check if the jar file is available in the target-scala-2.10 folder? When you use sbt package to make the jar file, that is where the jar file would be located. The following command works well for me: spark-submit --class “Classname --master yarn-cluster jarfile(withcomplete path) Can you try checking with this initially and later add other options? On Mon, Oct 13, 2014 at 7:36 PM, Jimmy ji...@sellpoints.commailto:ji...@sellpoints.com wrote: Having the exact same error with the exact same jar Do you work for Altiscale? :) J Sent from my iPhone On Oct 13, 2014, at 5:33 PM, Andy Srine andy.sr...@gmail.commailto:andy.sr...@gmail.com wrote: Hi Guys, Spark rookie here. I am getting a file not found exception on the --jars. This is on the yarn cluster mode and I am running the following command on our recently upgraded Spark 1.1.1 environment. ./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class myEngine --driver-memory 1g --driver-library-path /hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-0.4.18-201406111750.jar --executor-memory 5g --executor-cores 5 --jars /home/andy/spark/lib/joda-convert-1.2.jar --queue default --num-executors 4 /home/andy/spark/lib/my-spark-lib_1.0.jar This is the error I am hitting. Any tips would be much appreciated. The file permissions looks fine on my local disk. 14/10/13 22:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED 14/10/13 22:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 12, 122-67.vb2.company.comhttp://122-67.vb2.company.com): java.io.FileNotFoundException: ./joda-convert-1.2.jar (Permission
Unit testing jar request
Hi, we are Spark users and we use some Spark's test classes for our own application unit tests. We use LocalSparkContext and SharedSparkContext. But these classes are not included in the spark-core library. This is a good option as it's not a good idea to include test classes in the runtime jar... Anyway, do you think that it will be possible please to Spark team to push the jar test of spark core module on the maven repository ? If I understand it's just a plug in to add in the spark/core/pom.xml file like describe here http://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html Thanks, jean charles Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Spark on secure HDFS
Hi, We really would like to use Spark but we can’t because we have a secure HDFS environment (Cloudera). I understood https://issues.apache.org/jira/browse/SPARK-2541 contains a patch. Can one of the committers please take a look? Thanks! Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.com/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Concepts
Hi ,I'm pretty new to Big Data Spark both. I've just started POC work on spark and me my team are evaluating it with other In Memory computing tools such as GridGain, Bigmemory, Aerospike some others too, specifically to solve two sets of problems.1) Data Storage : Our current application runs on a single node which is a heavy configuration of 24 cores 350Geg, our application loads all the datamart data inclusive of multiple cubes into the memory converts it and keeps it in a Trove Collection in a form of Key / Value map. This collection is a immutable collection which takes about 15-20 Gegs of memory space. Our anticipation is that the data would grow 10-15 folds in the next year or so we are not very confident of Trove being able to scale to that level.2) Compute: Ours in a natively Analytical application doing predictive analytics with lots of simulations and optimizations of scenarios, at the heart of all this are the Trove Collections using which we perform our Mathematical algorithms to calculate the end result, in doing so, the memory consumption of the application goes beyond 250-300Geg. These are because of lots of intermediate computing results ( collections ) which are further broken down to the granular level and then searched in the Trove collection. All this happens on a single node which obviously starts to perform slowly over a period of time. And based on the large volume of data incoming in the next year or so, our current architecture will not be able to handle such massive In Memory data set such computing power. Hence we are targeting to change the architecture to a cluster based in memory distributed computing. We are evaluating all these products along with Apache Spark. We were very excited by Apache spark looking at the videos and some online resources, but when it came down to doing handson we are facing lots of issues.1)What are Standalone Cluster's limitations ? Can i configure a Cluster on a Single Node with Multiple Processes of Worker Nodes, Executors etc. ? Is this supported even though the IP Address would be the same ? 2) Why so many Java Processes ? Why are there so many Java Processes ? Worker Nodes - Executors ? Will the communication between them not slow down the performance on a whole ?3) How is Parallelism on Partitioned Data achieved ? This one is really important for us to understand, since are doing our benchmarkings on Partitioned data, We do not know how to configure Partitions on Spark ? Any help here would be appreciated. We want to partition data present in Cubes, hence we want Each Cube to be a separate partition.4) What is the difference between Multiple Nodes executing Jobs Multiple Tasks Executing Jobs ? How do these handle the partitioning parallelism. Help in these questions would be really appreciated, to get a better sense of Apache Spark.Thanks,Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Concepts-tp16477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark output to s3 extremely slow
Hi, How large is the dataset you're saving into S3? Actually saving to S3 is done in two steps: 1) writing temporary files 2) commiting them to proper directory Step 2) could be slow because S3 do not have a quick atomic move operation, you have to copy (server side but still takes time) and then delete the original. I've overcome this but using a jobconf with NullOutputCommitter jobConf.setOutputCommitter(classOf[NullOutputCommitter]) Where NullOutputCommiter is a Class that doesn't do anything: class NullOutputCommitter extends OutputCommitter { def abortTask(taskContext: TaskAttemptContext) = { } override def cleanupJob(jobContext: JobContext ) = { } def commitTask(taskContext: TaskAttemptContext ) = { } def needsTaskCommit(taskContext: TaskAttemptContext ) = { false } def setupJob(jobContext: JobContext) { } def setupTask(taskContext: TaskAttemptContext) { } } This works but maybe someone has a better solution. /Raf anny9699 wrote: Hi, I found writing output back to s3 using rdd.saveAsTextFile() is extremely slow, much slower than reading from s3. Is there a way to make it faster? The rdd has 150 partitions so parallelism is enough I assume. Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-output-to-s3-extremely-slow-tp16447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SparkSQL] Convert JavaSchemaRDD to SchemaRDD
I don't know why the JavaSchemaRDD.baseSchemaRDD is private[sql]. And I found that DataTypeConversions is protected[sql]. Finally I find this solution: pre code jrdd.registerTempTable(transform_tmp) jrdd.sqlContext.sql(select * from transform_tmp) /code /pre Could Any One tell me that: Is it a good idea for me to *use catalyst as DSL's execution engine?* I am trying to build a DSL, And I want to confirm this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Convert-JavaSchemaRDD-to-SchemaRDD-tp16482.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Default spark.deploy.recoveryMode
which means the details are not persisted and hence any failures in workers and master wouldnt start the daemons normally ..right ? On Wed, Oct 15, 2014 at 12:17 PM, Prashant Sharma [via Apache Spark User List] ml-node+s1001560n16468...@n3.nabble.com wrote: [Removing dev lists] You are absolutely correct about that. Prashant Sharma On Tue, Oct 14, 2014 at 5:03 PM, Priya Ch [hidden email] http://user/SendEmail.jtp?type=nodenode=16468i=0 wrote: Hi Spark users/experts, In Spark source code (Master.scala Worker.scala), when registering the worker with master, I see the usage of *persistenceEngine*. When we don't specify spark.deploy.recovery mode explicitly, what is the default value used ? This recovery mode is used to persists and restore the application worker details. I see when recovery mode not specified explicitly, *BlackHolePersistenceEngine* being used. Am i right ? Thanks, Padma Ch -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Default-spark-deploy-recoveryMode-tp16375p16468.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=bGVhcm5pbmdzLmNoaXR0dXJpQGdtYWlsLmNvbXwxfC03NzExMjUwMg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Default-spark-deploy-recoveryMode-tp16375p16483.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Default spark.deploy.recoveryMode
So if you need those features you can go ahead and setup one of Filesystem or zookeeper options. Please take a look at: http://spark.apache.org/docs/latest/spark-standalone.html. Prashant Sharma On Wed, Oct 15, 2014 at 3:25 PM, Chitturi Padma learnings.chitt...@gmail.com wrote: which means the details are not persisted and hence any failures in workers and master wouldnt start the daemons normally ..right ? On Wed, Oct 15, 2014 at 12:17 PM, Prashant Sharma [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=16483i=0 wrote: [Removing dev lists] You are absolutely correct about that. Prashant Sharma On Tue, Oct 14, 2014 at 5:03 PM, Priya Ch [hidden email] http://user/SendEmail.jtp?type=nodenode=16468i=0 wrote: Hi Spark users/experts, In Spark source code (Master.scala Worker.scala), when registering the worker with master, I see the usage of *persistenceEngine*. When we don't specify spark.deploy.recovery mode explicitly, what is the default value used ? This recovery mode is used to persists and restore the application worker details. I see when recovery mode not specified explicitly, *BlackHolePersistenceEngine* being used. Am i right ? Thanks, Padma Ch -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Default-spark-deploy-recoveryMode-tp16375p16468.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=16483i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Default spark.deploy.recoveryMode http://apache-spark-user-list.1001560.n3.nabble.com/Default-spark-deploy-recoveryMode-tp16375p16483.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark Streaming: Sentiment Analysis of Twitter streams
I just ran the same code and it is running perfectly fine on my machine. These are the things on my end: - Spark version: 1.1.0 - Gave full path to the negative and positive files - Set twitter auth credentials in the environment. And here's the code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Sentimenter { def main(args: Array[String]) { System.setProperty(twitter4j.oauth.consumerKey,X); System.setProperty(twitter4j.oauth.consumerSecret,X); System.setProperty(twitter4j.oauth.accessToken,); System.setProperty(twitter4j.oauth.accessTokenSecret,XXX); val filters = new Array[String](2) filters(0) = ebola filters(1) = isis val sparkConf = new SparkConf().setAppName(TweetSentiment).setMaster(local[2]) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(file:///home/akhld/positive-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(file:///home/akhld/negative-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() } } Thanks Best Regards On Wed, Oct 15, 2014 at 1:43 AM, SK skrishna...@gmail.com wrote: Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName(TweetSentiment) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(positive-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(negative-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new
Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]
What results do you want? If your pair is like (a, b), where a is the key and b is the value, you can try rdd1 = rdd1.flatMap(lambda l: l) and then use cogroup. Best Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16489.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: Sentiment Analysis of Twitter streams
Hi, I am using 1.1.0. I did set my twitter credentials and I am using the full path. I did not paste this in the public post. I am running on a cluster and getting the exception. Are you running in local or standalone mode? Thanks On Oct 15, 2014 3:20 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I just ran the same code and it is running perfectly fine on my machine. These are the things on my end: - Spark version: 1.1.0 - Gave full path to the negative and positive files - Set twitter auth credentials in the environment. And here's the code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Sentimenter { def main(args: Array[String]) { System.setProperty(twitter4j.oauth.consumerKey,X); System.setProperty(twitter4j.oauth.consumerSecret,X); System.setProperty(twitter4j.oauth.accessToken,); System.setProperty(twitter4j.oauth.accessTokenSecret,XXX); val filters = new Array[String](2) filters(0) = ebola filters(1) = isis val sparkConf = new SparkConf().setAppName(TweetSentiment).setMaster(local[2]) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(file:///home/akhld/positive-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(file:///home/akhld/negative-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() } } Thanks Best Regards On Wed, Oct 15, 2014 at 1:43 AM, SK skrishna...@gmail.com wrote: Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName(TweetSentiment) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(positive-words.txt) .filter(line = !line.isEmpty())
Re: Spark Streaming: Sentiment Analysis of Twitter streams
I ran it in both local and standalone, it worked for me. It does throws a bind exception which is normal since we are using both SparkContext and StreamingContext. Thanks Best Regards On Wed, Oct 15, 2014 at 5:25 PM, S Krishna skrishna...@gmail.com wrote: Hi, I am using 1.1.0. I did set my twitter credentials and I am using the full path. I did not paste this in the public post. I am running on a cluster and getting the exception. Are you running in local or standalone mode? Thanks On Oct 15, 2014 3:20 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I just ran the same code and it is running perfectly fine on my machine. These are the things on my end: - Spark version: 1.1.0 - Gave full path to the negative and positive files - Set twitter auth credentials in the environment. And here's the code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Sentimenter { def main(args: Array[String]) { System.setProperty(twitter4j.oauth.consumerKey,X); System.setProperty(twitter4j.oauth.consumerSecret,X); System.setProperty(twitter4j.oauth.accessToken,); System.setProperty(twitter4j.oauth.accessTokenSecret,XXX); val filters = new Array[String](2) filters(0) = ebola filters(1) = isis val sparkConf = new SparkConf().setAppName(TweetSentiment).setMaster(local[2]) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(file:///home/akhld/positive-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(file:///home/akhld/negative-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() } } Thanks Best Regards On Wed, Oct 15, 2014 at 1:43 AM, SK skrishna...@gmail.com wrote: Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new
Re: jsonRDD: NoSuchMethodError
How did you resolve it? On Tue, Jul 15, 2014 at 3:50 AM, SK skrishna...@gmail.com wrote: The problem is resolved. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9742.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
SparkSQL: set hive.metastore.warehouse.dir in CLI doesn't work
Hi, The following query in sparkSQL 1.1.0 CLI doesn't work. *SET hive.metastore.warehouse.dir=/home/spark/hive/warehouse ; create table test as select v1.*, v2.card_type, v2.card_upgrade_time_black, v2.card_upgrade_time_gold from customer v1 left join customer_loyalty v2 on v1.account_id = v2.account_id limit 5 ;* StackTrack = org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(*message:file:/user/hive/warehouse/test* is not a directory or unable to create one) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:602) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:559) at org.apache.spark.sql.hive.HiveMetastoreCatalog.createTable(HiveMetastoreCatalog.scala:99) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:116) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$.apply(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: MetaException(message:file:/user/hive/warehouse/test is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1060) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1107) at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) at com.sun.proxy.$Proxy15.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:482) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:471) at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy16.createTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:596) ... 30 more It seems that CLI doesn't take the hive.metastore.warehouse.dir value when creating table with as select If just create the table, like create table t (...), and then load
Problem executing Spark via JBoss application
Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:259) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:275) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:161) [jbossweb-7.0.13.Final.jar:] at org.jboss.as.jpa.interceptor.WebNonTxEmCloserValve.invoke(WebNonTxEmCloserValve.java:50) [jboss-as-jpa-7.1.1.Final.jar:7.1.1.Final] at org.jboss.as.web.security.SecurityContextAssociationValve.invoke(SecurityContextAssociationValve.java:153) [jboss-as-web-7.1.1.Final.jar:7.1.1.Final] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:155) [jbossweb-7.0.13.Final.jar:] at
How to close resources shared in executor?
In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
Re: How to create Track per vehicle using spark RDD
It is wonderful to see some idea. Now the questions: 1) What is a track segment? Ans) It is the line that contains two adjacent points when all points are arranged by time. Say a vehicle moves (t1, p1) - (t2, p2) - (t3, p3). Then the segments are (p1, p2), (p2, p3) when the time ordering is (t1 t2 t3) 2) What is Lag function. Ans) Sean's link explains it. Little bit more to my requirement: What I need to calculate is a density Map of vehicles in a certain area. Because of a user specific requirement I can't use just points but I will have to use segments. I already have a gridRDD containing 1km polygons for the whole world. My approach is 1) create a tracksegmentRDD of Vehicle, segment 2) do a cartesian of tracksegmentRDD and gridRDD and for each row check if the segment intersects the polygon. If it does then count it as 1. 3) Group the result above by vehicle(probably reduceByKey(_ + _) ) to get the density Map I am checking an issue http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html which seems to have some potential. I will give it a try. ..Manas On Wed, Oct 15, 2014 at 2:55 AM, sowen [via Apache Spark User List] ml-node+s1001560n16471...@n3.nabble.com wrote: You say you reduceByKey but are you really collecting all the tuples for a vehicle in a collection, like what groupByKey does already? Yes, if one vehicle has a huge amount of data that could fail. Otherwise perhaps you are simply not increasing memory from the default. Maybe you can consider using something like vehicle and *day* as a key. This would make you process each day of data separately, but if that's fine for you, might drastically cut down the data associated to a single key. Spark Streaming has a windowing function, and there is a window function for an entire RDD, but I am not sure if there is support for a 'window by key' anywhere. You can perhaps get your direct approach of collecting events working with some of the changes above. Otherwise I think you have to roll your own to some extent, creating the overlapping buckets of data, which will mean mapping the data to several copies of itself. This might still be quite feasible depending on how big a lag you are thinking of. PS for the interested, this is what LAG is: http://www.oracle-base.com/articles/misc/lag-lead-analytic-functions.php#lag On Wed, Oct 15, 2014 at 1:37 AM, Manas Kar [hidden email] http://user/SendEmail.jtp?type=nodenode=16471i=0 wrote: Hi, I have an RDD containing Vehicle Number , timestamp, Position. I want to get the lag function equivalent to my RDD to be able to create track segment of each Vehicle. Any help? PS: I have tried reduceByKey and then splitting the List of position in tuples. For me it runs out of memory every time because of the volume of data. ...Manas For some reason I have never got any reply to my emails to the user group. I am hoping to break that trend this time. :) - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=16471i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=16471i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Lag-function-equivalent-in-an-RDD-tp16448p16471.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Lag function equivalent in an RDD, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=16448code=bWFuYXNkZWJhc2hpc2thckBnbWFpbC5jb218MTY0NDh8LTM0Nzc4MjUwMg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml - Manas Kar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lag-function-equivalent-in-an-RDD-tp16448p16498.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to add HBase dependencies and conf with spark-submit?
+user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO raofeng...@gmail.com: We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPathproperties to manage extra ClassPath, over even an deprecated SPARK_CLASSPATH. The problem is what classpath or jars should we append? I can simplely add the whole `hbase classpath`, which is huge, but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark uses guava-14.
Re: A question about streaming throughput
Ok, I understand. But in both cases the data are in the same processing node. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/A-question-about-streaming-throughput-tp16416p16501.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to close resources shared in executor?
Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
Re: How to add HBase dependencies and conf with spark-submit?
I am writing to HBase, following are my options: export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar spark-submit \ --jars /opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ - Original Message - From: Fengyun RAO raofeng...@gmail.com To: user@spark.apache.org, u...@hbase.apache.org Sent: Wednesday, October 15, 2014 6:29:21 AM Subject: Re: How to add HBase dependencies and conf with spark-submit? +user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO raofeng...@gmail.com : We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPath properties to manage extra ClassPath, over even an deprecated SPARK_CLASSPATH. The problem is what classpath or jars should we append? I can simplely add the whole `hbase classpath`, which is huge, but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark uses guava-14. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Worker crashing and Master not seeing recovered worker
This is still happening to me on mesos. Any workarounds? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-crashing-and-Master-not-seeing-recovered-worker-tp2312p16506.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: Sentiment Analysis of Twitter streams
It looks like you're making the StreamingContext and SparkContext separately from the same conf. Instead, how about passing the SparkContext to the StreamingContext constructor? it seems like better practice and is a guess at the problem cause. On Tue, Oct 14, 2014 at 9:13 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName(TweetSentiment) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(positive-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(negative-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(60)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() sc.stop() // I tried commenting this, but the exception still appeared. } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
matrix operations?
hi there... is there any other matrix operations in addition to multiply()? like addition or dot product? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/matrix-operations-tp16508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RowMatrix.multiply() ?
hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter and returns the result as a distributed RowMatrix. how do you perform this series of multiplications if A, B, C, and D are all RowMatrix? ((A x B) x C) x D) thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL IndexOutOfBoundsException when reading from Parquet
Hi Yin, pqt_rdt_snappy has 76 columns. These two parquet tables were created via Hive 0.12 from existing Avro data using CREATE TABLE following by an INSERT OVERWRITE. These are partitioned tables - pqt_rdt_snappy has one partition while pqt_segcust_snappy has two partitions. For pqt_segcust_snappy, I noticed that when I populated it with a single INSERT OVERWRITE over all the partitions and then executed the Spark code, it would report an illegal index value of 29. However, if I manually did INSERT OVERWRITE for every single partition, I would get an illegal index value of 21. I don’t know if this will help in debugging, but here’s the DESCRIBE output for pqt_segcust_snappy: OK col_namedata_type comment customer_id string from deserializer age_range string from deserializer gender string from deserializer last_tx_datebigint from deserializer last_tx_date_ts string from deserializer last_tx_date_dt string from deserializer first_tx_date bigint from deserializer first_tx_date_tsstring from deserializer first_tx_date_dtstring from deserializer second_tx_date bigint from deserializer second_tx_date_ts string from deserializer second_tx_date_dt string from deserializer third_tx_date bigint from deserializer third_tx_date_tsstring from deserializer third_tx_date_dtstring from deserializer frequency double from deserializer tx_size double from deserializer recency double from deserializer rfm double from deserializer tx_countbigint from deserializer sales double from deserializer coll_def_id string None seg_def_id string None # Partition Information # col_name data_type comment coll_def_id string None seg_def_id string None Time taken: 0.788 seconds, Fetched: 29 row(s) As you can see, I have 21 data columns, followed by the 2 partition columns, coll_def_id and seg_def_id. Output shows 29 rows, but that looks like it’s just counting the rows in the console output. Let me know if you need more information. Thanks -Terry From: Yin Huai huaiyin@gmail.commailto:huaiyin@gmail.com Date: Tuesday, October 14, 2014 at 6:29 PM To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com Cc: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet Hello Terry, How many columns does pqt_rdt_snappy have? Thanks, Yin On Tue, Oct 14, 2014 at 11:52 AM, Terry Siu terry@smartfocus.commailto:terry@smartfocus.com wrote: Hi Michael, That worked for me. At least I’m now further than I was. Thanks for the tip! -Terry From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Monday, October 13, 2014 at 5:05 PM To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet There are some known bug with the parquet serde and spark 1.1. You can try setting spark.sql.hive.convertMetastoreParquet=true to cause spark sql to use built in parquet support when the serde looks like parquet. On Mon, Oct 13, 2014 at 2:57 PM, Terry Siu terry@smartfocus.commailto:terry@smartfocus.com wrote: I am currently using Spark 1.1.0 that has been compiled against Hadoop 2.3. Our cluster is CDH5.1.2 which is runs Hive 0.12. I have two external Hive tables that point to Parquet (compressed with Snappy), which were converted over from Avro if that matters. I am trying to perform a join with these two Hive tables, but am encountering an exception. In a nutshell, I launch a spark shell, create my HiveContext (pointing to the correct metastore on our cluster), and then proceed to do the following: scala val hc = new HiveContext(sc) scala val txn = hc.sql(“select * from pqt_rdt_snappy where transdate = 132537600 and translate = 134006399”) scala val segcust = hc.sql(“select * from pqt_segcust_snappy where coll_def_id=‘abcd’”) scala txn.registerAsTable(“segTxns”) scala segcust.registerAsTable(“segCusts”)
Re: SPARK_SUBMIT_CLASSPATH question
I guess I was a little light on the details in my haste. I'm using Spark on YARN, and this is in the driver process in yarn-client mode (most notably spark-shell). I've had to manually add a bunch of JARs that I had thought it would just pick up like everything else does: export SPARK_SUBMIT_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64:$SPARK_SUBMIT_LIBRARY_PATH export SPARK_SUBMIT_CLASSPATH=/usr/lib/hadoop/lib/hadoop-openstack-2.4.0.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar:/usr/lib/spark-yarn/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark-yarn/lib/datanucleus-core-3.2.10.jar:/usr/lib/spark-yarn/lib/datanucleus-rdbms-3.2.9.jar:/usr/lib/hadoop/lib/hadoop-lzo-0.6.0.jar:$SPARK_SUBMIT_CLASSPATH The lzo jar and the SPARK_SUBMIT_LIBRARY_PATH were required to get anything at all to work. Without them, basic communication failed because it couldn't load the lzo library to compress/decompress the data. The datanucleus stuff was required for hive on spark, and the hadoop-openstack and jackson jars are for the swiftfs hdfs plugin to work from within spark-shell. I tried stuff like: export SPARK_SUBMIT_CLASSPATH=/usr/lib/hadoop/lib/* But that didn't work at all. I have to specify every individual jar like that. Is there something I'm missing or some easier way to accomplish this? I'm worried that I'll keep finding more missing dependencies as we explore other features and the classpath string is going to take up a whole screen. Greg From: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com Date: Tuesday, October 14, 2014 1:57 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SPARK_SUBMIT_CLASSPATH question It seems to me that SPARK_SUBMIT_CLASSPATH does not follow the same ability as other tools to put wildcards in the paths you add. For some reason it doesn't pick up the classpath information from yarn-site.xml either, it seems, when running on YARN. I'm having to manually add every single dependency JAR. There must be a better way, so what am I missing? Greg
Re: Problem executing Spark via JBoss application
From this line : Removing executor app-20141015142644-0125/0 because it is EXITED I would guess that you need to examine the executor log to see why the executor actually exited. My guess would be that the executor cannot connect back to your driver. But check the log from the executor. It should be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I believe. On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.be wrote: Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:259) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:275) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:161) [jbossweb-7.0.13.Final.jar:]
Serialize/deserialize Naive Bayes model and index files
Hi, I am trying to persist the files generated as a result of Naive bayes training with MLlib. These comprise of the model file, label index(own class) and term dictionary(own class). I need to save them on an HDFS location and then deserialize when needed for prediction. How can I do the same with Spark? Also, I have the option of saving these instances in HBase in binary form. Which approach makes more sense? Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialize-deserialize-Naive-Bayes-model-and-index-files-tp16513.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-sql not coming up with Hive 0.10.0/CDH 4.6
Hi, I compiled spark 1.1.0 with CDH 4.6 but when I try to get spark-sql cli up, it gives error: == [atangri@pit-uat-hdputil1 bin]$ ./spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Unable to initialize logging using hive-log4j.properties, not found on CLASSPATH! Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/15 17:45:17 INFO SecurityManager: Changing view acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: Changing modify acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(atangri, ); users with modify permissions: Set(atangri, ) 14/10/15 17:45:17 INFO Slf4jLogger: Slf4jLogger started 14/10/15 17:45:17 INFO Remoting: Starting remoting 14/10/15 17:45:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Utils: Successfully started service 'sparkDriver' on port 54506. 14/10/15 17:45:17 INFO SparkEnv: Registering MapOutputTracker 14/10/15 17:45:17 INFO SparkEnv: Registering BlockManagerMaster 14/10/15 17:45:17 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141015174517-bdfa 14/10/15 17:45:17 INFO Utils: Successfully started service 'Connection manager for block manager' on port 58400. 14/10/15 17:45:17 INFO ConnectionManager: Bound socket to port 58400 with id = ConnectionManagerId(pit-uat-hdputil1.snc1,58400) 14/10/15 17:45:17 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/10/15 17:45:17 INFO BlockManagerMaster: Trying to register BlockManager 14/10/15 17:45:17 INFO BlockManagerMasterActor: Registering block manager pit-uat-hdputil1.snc1:58400 with 265.1 MB RAM 14/10/15 17:45:17 INFO BlockManagerMaster: Registered BlockManager 14/10/15 17:45:17 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c7f28004-6189-424f-a214-379d5dcc72b7 14/10/15 17:45:17 INFO HttpServer: Starting HTTP Server 14/10/15 17:45:17 INFO Utils: Successfully started service 'HTTP file server' on port 33666. 14/10/15 17:45:18 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/15 17:45:18 INFO SparkUI: Started SparkUI at http://pit-uat-hdputil1.snc1:4040 14/10/15 17:45:18 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506/user/HeartbeatReceiver spark-sql show tables; 14/10/15 17:45:22 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:22 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.run 14/10/15 17:45:23 INFO Driver: PERFLOG method=TimeToSubmit 14/10/15 17:45:23 INFO Driver: PERFLOG method=compile 14/10/15 17:45:23 INFO Driver: PERFLOG method=parse 14/10/15 17:45:23 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:23 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=parse start=1413395123538 end=1413395123539 duration=1 14/10/15 17:45:23 INFO Driver: PERFLOG method=semanticAnalyze 14/10/15 17:45:23 INFO Driver: Semantic Analysis Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=semanticAnalyze start=1413395123539 end=1413395123641 duration=102 14/10/15 17:45:23 INFO ListSinkOperator: Initializing Self 0 OP 14/10/15 17:45:23 INFO ListSinkOperator: Operator 0 OP initialized 14/10/15 17:45:23 INFO ListSinkOperator: Initialization Done 0 OP 14/10/15 17:45:23 INFO Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null) 14/10/15 17:45:23 INFO Driver: /PERFLOG method=compile start=1413395123517 end=1413395123696 duration=179 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.execute 14/10/15 17:45:23 INFO Driver: Starting command: show tables 14/10/15 17:45:23 INFO Driver: /PERFLOG method=TimeToSubmit start=1413395123517 end=1413395123698 duration=181 14/10/15 17:45:23 INFO Driver: PERFLOG method=runTasks 14/10/15 17:45:23 INFO Driver: PERFLOG method=task.DDL.Stage-0 14/10/15 17:45:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/10/15 17:45:23 INFO ObjectStore: ObjectStore, initialize called 14/10/15 17:45:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 14/10/15 17:45:24 WARN BoneCPConfig: Max Connections 1. Setting to 20 14/10/15 17:45:24 INFO ObjectStore: Setting MetaStore object pin classes with
Re: spark-sql not coming up with Hive 0.10.0/CDH 4.6
I see Hive 0.10.0 metastore sql does not have a VERSION table but spark is looking for it. Anyone else faced this issue or any ideas on how to fix it ? Thanks, Anurag Tangri On Wed, Oct 15, 2014 at 10:51 AM, Anurag Tangri atan...@groupon.com wrote: Hi, I compiled spark 1.1.0 with CDH 4.6 but when I try to get spark-sql cli up, it gives error: == [atangri@pit-uat-hdputil1 bin]$ ./spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Unable to initialize logging using hive-log4j.properties, not found on CLASSPATH! Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/15 17:45:17 INFO SecurityManager: Changing view acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: Changing modify acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(atangri, ); users with modify permissions: Set(atangri, ) 14/10/15 17:45:17 INFO Slf4jLogger: Slf4jLogger started 14/10/15 17:45:17 INFO Remoting: Starting remoting 14/10/15 17:45:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Utils: Successfully started service 'sparkDriver' on port 54506. 14/10/15 17:45:17 INFO SparkEnv: Registering MapOutputTracker 14/10/15 17:45:17 INFO SparkEnv: Registering BlockManagerMaster 14/10/15 17:45:17 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141015174517-bdfa 14/10/15 17:45:17 INFO Utils: Successfully started service 'Connection manager for block manager' on port 58400. 14/10/15 17:45:17 INFO ConnectionManager: Bound socket to port 58400 with id = ConnectionManagerId(pit-uat-hdputil1.snc1,58400) 14/10/15 17:45:17 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/10/15 17:45:17 INFO BlockManagerMaster: Trying to register BlockManager 14/10/15 17:45:17 INFO BlockManagerMasterActor: Registering block manager pit-uat-hdputil1.snc1:58400 with 265.1 MB RAM 14/10/15 17:45:17 INFO BlockManagerMaster: Registered BlockManager 14/10/15 17:45:17 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c7f28004-6189-424f-a214-379d5dcc72b7 14/10/15 17:45:17 INFO HttpServer: Starting HTTP Server 14/10/15 17:45:17 INFO Utils: Successfully started service 'HTTP file server' on port 33666. 14/10/15 17:45:18 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/15 17:45:18 INFO SparkUI: Started SparkUI at http://pit-uat-hdputil1.snc1:4040 14/10/15 17:45:18 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506/user/HeartbeatReceiver spark-sql show tables; 14/10/15 17:45:22 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:22 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.run 14/10/15 17:45:23 INFO Driver: PERFLOG method=TimeToSubmit 14/10/15 17:45:23 INFO Driver: PERFLOG method=compile 14/10/15 17:45:23 INFO Driver: PERFLOG method=parse 14/10/15 17:45:23 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:23 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=parse start=1413395123538 end=1413395123539 duration=1 14/10/15 17:45:23 INFO Driver: PERFLOG method=semanticAnalyze 14/10/15 17:45:23 INFO Driver: Semantic Analysis Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=semanticAnalyze start=1413395123539 end=1413395123641 duration=102 14/10/15 17:45:23 INFO ListSinkOperator: Initializing Self 0 OP 14/10/15 17:45:23 INFO ListSinkOperator: Operator 0 OP initialized 14/10/15 17:45:23 INFO ListSinkOperator: Initialization Done 0 OP 14/10/15 17:45:23 INFO Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null) 14/10/15 17:45:23 INFO Driver: /PERFLOG method=compile start=1413395123517 end=1413395123696 duration=179 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.execute 14/10/15 17:45:23 INFO Driver: Starting command: show tables 14/10/15 17:45:23 INFO Driver: /PERFLOG method=TimeToSubmit start=1413395123517 end=1413395123698 duration=181 14/10/15 17:45:23 INFO Driver: PERFLOG method=runTasks 14/10/15 17:45:23 INFO Driver: PERFLOG method=task.DDL.Stage-0 14/10/15 17:45:23 INFO HiveMetaStore: 0: Opening raw store with implemenation
Re: spark-sql not coming up with Hive 0.10.0/CDH 4.6
Hi Anurag, Spark SQL (from the Spark standard distribution / sources) currently requires Hive 0.12; as you mention, CDH4 has Hive 0.10, so that's not gonna work. CDH 5.2 ships with Spark 1.1.0 and is modified so that Spark SQL can talk to the Hive 0.13.1 that is also bundled with CDH, so if that's an option for you, you could try it out. On Wed, Oct 15, 2014 at 11:23 AM, Anurag Tangri atan...@groupon.com wrote: I see Hive 0.10.0 metastore sql does not have a VERSION table but spark is looking for it. Anyone else faced this issue or any ideas on how to fix it ? Thanks, Anurag Tangri On Wed, Oct 15, 2014 at 10:51 AM, Anurag Tangri atan...@groupon.com wrote: Hi, I compiled spark 1.1.0 with CDH 4.6 but when I try to get spark-sql cli up, it gives error: == [atangri@pit-uat-hdputil1 bin]$ ./spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Unable to initialize logging using hive-log4j.properties, not found on CLASSPATH! Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/15 17:45:17 INFO SecurityManager: Changing view acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: Changing modify acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(atangri, ); users with modify permissions: Set(atangri, ) 14/10/15 17:45:17 INFO Slf4jLogger: Slf4jLogger started 14/10/15 17:45:17 INFO Remoting: Starting remoting 14/10/15 17:45:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Utils: Successfully started service 'sparkDriver' on port 54506. 14/10/15 17:45:17 INFO SparkEnv: Registering MapOutputTracker 14/10/15 17:45:17 INFO SparkEnv: Registering BlockManagerMaster 14/10/15 17:45:17 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141015174517-bdfa 14/10/15 17:45:17 INFO Utils: Successfully started service 'Connection manager for block manager' on port 58400. 14/10/15 17:45:17 INFO ConnectionManager: Bound socket to port 58400 with id = ConnectionManagerId(pit-uat-hdputil1.snc1,58400) 14/10/15 17:45:17 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/10/15 17:45:17 INFO BlockManagerMaster: Trying to register BlockManager 14/10/15 17:45:17 INFO BlockManagerMasterActor: Registering block manager pit-uat-hdputil1.snc1:58400 with 265.1 MB RAM 14/10/15 17:45:17 INFO BlockManagerMaster: Registered BlockManager 14/10/15 17:45:17 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c7f28004-6189-424f-a214-379d5dcc72b7 14/10/15 17:45:17 INFO HttpServer: Starting HTTP Server 14/10/15 17:45:17 INFO Utils: Successfully started service 'HTTP file server' on port 33666. 14/10/15 17:45:18 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/15 17:45:18 INFO SparkUI: Started SparkUI at http://pit-uat-hdputil1.snc1:4040 14/10/15 17:45:18 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506/user/HeartbeatReceiver spark-sql show tables; 14/10/15 17:45:22 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:22 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.run 14/10/15 17:45:23 INFO Driver: PERFLOG method=TimeToSubmit 14/10/15 17:45:23 INFO Driver: PERFLOG method=compile 14/10/15 17:45:23 INFO Driver: PERFLOG method=parse 14/10/15 17:45:23 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:23 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=parse start=1413395123538 end=1413395123539 duration=1 14/10/15 17:45:23 INFO Driver: PERFLOG method=semanticAnalyze 14/10/15 17:45:23 INFO Driver: Semantic Analysis Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=semanticAnalyze start=1413395123539 end=1413395123641 duration=102 14/10/15 17:45:23 INFO ListSinkOperator: Initializing Self 0 OP 14/10/15 17:45:23 INFO ListSinkOperator: Operator 0 OP initialized 14/10/15 17:45:23 INFO ListSinkOperator: Initialization Done 0 OP 14/10/15 17:45:23 INFO Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null) 14/10/15 17:45:23 INFO Driver: /PERFLOG method=compile start=1413395123517 end=1413395123696 duration=179 14/10/15 17:45:23 INFO Driver: PERFLOG
Re: spark-sql not coming up with Hive 0.10.0/CDH 4.6
Hi Marcelo, Exactly. Found it few minutes ago. I ran mysql hive 12 sql on my hive 10 metastore, which created missing tables and it seems to be working now. Not sure if everything else in CDH 4.6/Hive 10 would also still be working though or not. Looks like we cannot use Spark SQL in a clean way with CDH4 unless we upgrade to CDH5. Thanks for your response! Thanks, Anurag Tangri On Wed, Oct 15, 2014 at 12:02 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Anurag, Spark SQL (from the Spark standard distribution / sources) currently requires Hive 0.12; as you mention, CDH4 has Hive 0.10, so that's not gonna work. CDH 5.2 ships with Spark 1.1.0 and is modified so that Spark SQL can talk to the Hive 0.13.1 that is also bundled with CDH, so if that's an option for you, you could try it out. On Wed, Oct 15, 2014 at 11:23 AM, Anurag Tangri atan...@groupon.com wrote: I see Hive 0.10.0 metastore sql does not have a VERSION table but spark is looking for it. Anyone else faced this issue or any ideas on how to fix it ? Thanks, Anurag Tangri On Wed, Oct 15, 2014 at 10:51 AM, Anurag Tangri atan...@groupon.com wrote: Hi, I compiled spark 1.1.0 with CDH 4.6 but when I try to get spark-sql cli up, it gives error: == [atangri@pit-uat-hdputil1 bin]$ ./spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Unable to initialize logging using hive-log4j.properties, not found on CLASSPATH! Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/15 17:45:17 INFO SecurityManager: Changing view acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: Changing modify acls to: atangri, 14/10/15 17:45:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(atangri, ); users with modify permissions: Set(atangri, ) 14/10/15 17:45:17 INFO Slf4jLogger: Slf4jLogger started 14/10/15 17:45:17 INFO Remoting: Starting remoting 14/10/15 17:45:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@pit-uat-hdputil1.snc1:54506] 14/10/15 17:45:17 INFO Utils: Successfully started service 'sparkDriver' on port 54506. 14/10/15 17:45:17 INFO SparkEnv: Registering MapOutputTracker 14/10/15 17:45:17 INFO SparkEnv: Registering BlockManagerMaster 14/10/15 17:45:17 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141015174517-bdfa 14/10/15 17:45:17 INFO Utils: Successfully started service 'Connection manager for block manager' on port 58400. 14/10/15 17:45:17 INFO ConnectionManager: Bound socket to port 58400 with id = ConnectionManagerId(pit-uat-hdputil1.snc1,58400) 14/10/15 17:45:17 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/10/15 17:45:17 INFO BlockManagerMaster: Trying to register BlockManager 14/10/15 17:45:17 INFO BlockManagerMasterActor: Registering block manager pit-uat-hdputil1.snc1:58400 with 265.1 MB RAM 14/10/15 17:45:17 INFO BlockManagerMaster: Registered BlockManager 14/10/15 17:45:17 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c7f28004-6189-424f-a214-379d5dcc72b7 14/10/15 17:45:17 INFO HttpServer: Starting HTTP Server 14/10/15 17:45:17 INFO Utils: Successfully started service 'HTTP file server' on port 33666. 14/10/15 17:45:18 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/15 17:45:18 INFO SparkUI: Started SparkUI at http://pit-uat-hdputil1.snc1:4040 14/10/15 17:45:18 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@pit-uat-hdputil1.snc1 :54506/user/HeartbeatReceiver spark-sql show tables; 14/10/15 17:45:22 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:22 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: PERFLOG method=Driver.run 14/10/15 17:45:23 INFO Driver: PERFLOG method=TimeToSubmit 14/10/15 17:45:23 INFO Driver: PERFLOG method=compile 14/10/15 17:45:23 INFO Driver: PERFLOG method=parse 14/10/15 17:45:23 INFO ParseDriver: Parsing command: show tables 14/10/15 17:45:23 INFO ParseDriver: Parse Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=parse start=1413395123538 end=1413395123539 duration=1 14/10/15 17:45:23 INFO Driver: PERFLOG method=semanticAnalyze 14/10/15 17:45:23 INFO Driver: Semantic Analysis Completed 14/10/15 17:45:23 INFO Driver: /PERFLOG method=semanticAnalyze
Exception while reading SendingConnection to ConnectionManagerId
Hi there, I'm running spark on ec2, and am running into an error there that I don't get locally. Here's the error: 11335 [handle-read-write-executor-3] ERROR org.apache.spark.network.SendingConnection - Exception while reading SendingConnection to ConnectionManagerId([IP HERE]) java.nio.channels.ClosedChannelException Does anyone know what might be causing this? Spark is running on my ec2 instances. Thanks, Jimmy
Re: Spark Streaming: Sentiment Analysis of Twitter streams
You are right. Creating the StreamingContext from the SparkContext instead of SparkConf helped. Thanks for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410p16520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark tasks still scheduled after Spark goes down
Hi, My setup: tomcat (running a web app which initializes SparkContext) and dedicated Spark cluster (1 master 2 workers, 1VM per each). I am able to properly start this setup where SparkContext properly initializes connection with master. I am able to execute tasks and perform required calculations... everything works fine. The problem I'm facing is in the situation when Spark cluster goes dow, after mentioned proper startup (I'm trying to mimic a possible production issue where Spark cluster simply goes down for a reason and my web application should still work apart from the Spark related functionality). What happens is that even though the Spark cluster is not there DAGScheduler still schedules tasks and creates JobWaiters which wait endlessly for the task completion blocking the main thread. As a result of this my application runs out of available threads (this is happening in the part where I handle JMS with a pool of 10 threads) and can't proceed working correctly. I do not see any error in logs apart from Akka endlessly trying to reconnect to MasterExecutor. Is this a known issue or am Im missing sth. obvious in the configuration? Thanks a lot for any suggestion. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-tasks-still-scheduled-after-Spark-goes-down-tp16521.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Empty DStream / RDD and reduceByKey
Hi All, I figured out what the problem was. Thank you Sean for pointing me in the right direction. All the jibber jabber about empty DStream / RDD was all just pure nonsense [?] . I guess the sequence of events (the fact that spark streaming started crashing just after I implemented the reduceBykey) and reading the log file lead me to believe that there was something wrong with the way I implemented the reduceByKey. In fact there was nothing wrong with the reduceByKey implementation. Just for closure (no pun intended), i will try and explain what happened. Maybe it will help someone else in the future. Initially, my driver code had this definition - SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); wordMap.print(); jssc.start(); jssc.awaitTermination(); The above code does not have a reduceByKey. All I was doing was printing out was the pair [String, 1], and things worked perfectly fine. I started spark streaming and then stated the kafka producer and in the logs I could see the results. So far so good. Then I proceeded to introduce the reduceByKey, to count the words in each batch. I created a ReduceWords.java file with the class ReduceWords with the following definition. public class ReduceWords implements Function2Integer, Integer, Integer { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } and in my driver code, I introduced reduceByKey as follows - ... ... ... MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); This is when I started getting the exceptions and spark started to crash. So my instinct was to presume that something about reduceByKey was at fault. Then Sean pointed me to the idea that, reference to the containing class may have been serialized in the closure. But the issue was ReduceWords is just a regular class in its own java file. It is not an inner or anonymous class. This was what stumped me. I just could not figure out how ReduceWord could reference in any shape or form the driver class. The problem it turns out was the following - ... ... ... MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new
Getting the value from DStream[Int]
Hi, As a result of a reduction operation, the resultant value score is a DStream[Int] . How can I get the simple Int value? I tried score[0], and score._1, but neither worked and can't find a getValue() in the DStream API. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-value-from-DStream-Int-tp16525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming is slower than Spark
Hi, I am evaluating Sparking Streaming with kafka and i found that spark streaming is slower than Spark. It took more time is processing same amount of data as per the Spark Console it can process 2300 Records per seconds. Is my assumption is correct? Spark Streaming has to do a lot of this along with processing at the same time so that's why it is slow. Tarun
Re: SPARK_SUBMIT_CLASSPATH question
Hi Greg, I'm not sure exactly what it is that you're trying to achieve, but I'm pretty sure those variables are not supposed to be set by users. You should take a look at the documentation for spark.driver.extraClassPath and spark.driver.extraLibraryPath, and the equivalent options for executors. (The driver ones also have direct equivalents in the spark-submit command line, check its help output.) Since you're running on Yarn, you might also want to take a look at https://issues.apache.org/jira/browse/SPARK-1719 (and SPARK-1720). On Tue, Oct 14, 2014 at 11:57 AM, Greg Hill greg.h...@rackspace.com wrote: It seems to me that SPARK_SUBMIT_CLASSPATH does not follow the same ability as other tools to put wildcards in the paths you add. For some reason it doesn't pick up the classpath information from yarn-site.xml either, it seems, when running on YARN. I'm having to manually add every single dependency JAR. There must be a better way, so what am I missing? Greg -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to set log level of spark executor on YARN(using yarn-cluster mode)
Hi, I want to check the DEBUG log of spark executor on YARN(using yarn-cluster mode), but 1. yarn daemonlog setlevel DEBUG YarnChild.class 2. set log4j.properties in spark/conf folder on client node. no means above works. So how could i set the log level of spark executor* on YARN container to DEBUG?* Thanks! -- Wang Haihua
Re: how to set log level of spark executor on YARN(using yarn-cluster mode)
Hi Eric, Check the Debugging Your Application section at: http://spark.apache.org/docs/latest/running-on-yarn.html Long story short: upload your log4j.properties using the --files argument of spark-submit. (Mental note: we could make the log level configurable via a system property...) On Wed, Oct 15, 2014 at 5:58 PM, eric wong win19...@gmail.com wrote: Hi, I want to check the DEBUG log of spark executor on YARN(using yarn-cluster mode), but 1. yarn daemonlog setlevel DEBUG YarnChild.class 2. set log4j.properties in spark/conf folder on client node. no means above works. So how could i set the log level of spark executor on YARN container to DEBUG? Thanks! -- Wang Haihua -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Xiangrui, I am using yarn-cluster mode. The current hadoop cluster is configured to only accept yarn-cluster mode and not allow yarn-client mode. I have no prevelige to change that. Without initializing with k-means||, the job finished in 10 minutes. With k-means, it just hangs there for almost 1 hour. I guess I can only go with random initialization in KMeans. Thanks again for your help. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16530.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Play framework
Hi - Has anybody figured out how to integrate a Play application with Spark and run it on a Spark cluster using spark-submit script? I have seen some blogs about creating a simple Play app and running it locally on a dev machine with sbt run command. However, those steps don't work for Spark-submit. If you have figured out how to build and run a Play app with Spark-submit, I would appreciate if you could share the steps and the sbt settings for your Play app. Thanks, Mohammed
Sample codes for Spark streaming + Kafka + Scala + sbt?
Hi Anyone can share a project as a sample? I tried them a couple days ago but couldn't make it work. Looks like it's due to some Kafka dependency issue. I'm using sbt-assembly. Thanks Gary
Spark's shuffle file size keep increasing
I have a Spark application which is running Spark Streaming and Spark SQL. I observed the size of shuffle files under spark.local.dir folder keeps increase and never decreases. Eventually it will run out-of-disk-space error. The question is: when will Spark delete these shuffle files? In the application, I'm use some operations like updateStateByKey and enabling checkpoint already. Thank you! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Concepts
Anybody with good hands on with Spark, please do reply. It would help us a lot!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Concepts-tp16477p16536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: YARN deployment of Spark and Thrift JDBC server
I would like to reiterate that I don't have Hive installed on the Hadoop cluster. I have some queries on following comment from Cheng Lian-2: The Thrift server is used to interact with existing Hive data, and thus needs Hive Metastore to access Hive catalog. In your case, you need to build Spark with sbt/sbt -Phive,hadoop-2.4 clean package. But since you’ve already started Thrift server successfully, this step should already have been done properly. 1. Even though, I don't have Hive installed, How can I connect my application (Microsoft Excel etc.) to Spark SQL. Do I must have Hive installed. 2. Where can I download/get Spark SQL JDBC/ODBC drivers as I could not find it on databricks site. 3. Could somebody point me to steps to connect Excel with Spark SQL and get some data SQL. Is this possible at all. 4. Which all applications can be used to connect Spark SQL. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/YARN-deployment-of-Spark-and-Thrift-JDBC-server-tp16374p16537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to write data into Hive partitioned Parquet table?
I got tipped by an expert that the error of Unsupported language features in query that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition. My inefficient implementation is to: //1. run the query without DISTRIBUTE BY field1 SORT BY field2. JavaSchemaRDD rawRdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table); rawRdd.registerAsTempTable(temp); //2. Get a list of unique partition_field values JavaSchemaRDD partFieldsRdd = hiveCtx.sql(SELECT DISTINCT partition_field FROM temp); //3. Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile for (Row row : partFieldsRdd.toArray()) { String partitionVal = row.toString(0); hiveCtx.sql(SELECT * FROM temp WHERE partition_field=+partitionVal). saveAsParquetFile(partition_field=+partitionVal); } It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks. Regards, BH On Tue, Oct 14, 2014 at 8:44 PM, Banias H banias4sp...@gmail.com wrote: Hi, I am still new to Spark. Sorry if similar questions are asked here before. I am trying to read a Hive table; then run a query and save the result into a Hive partitioned Parquet table. For example, I was able to run the following in Hive: INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2 But when I tried running it in spark-sql, it gave me the following error: java.lang.RuntimeException: Unsupported language features in query: INSERT INTO TABLE ... I also tried the following Java code and I saw the same error: SparkConf sparkConf = new SparkConf().setAppName(Example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2); ... rdd.count(); //Just for running the query If I take out INSERT INTO TABLE target_table PARTITION (partition_field) from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly. Any help is much appreciated. Thanks. Regards, BH