Re: Understanding Spark's caching
Hi I replied you in SO. If option A had a action call then it should suffice too. On 28 Apr 2015 05:30, Eran Medan eran.me...@gmail.com wrote: Hi Everyone! I'm trying to understand how Spark's cache work. Here is my naive understanding, please let me know if I'm missing something: val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved) Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them. Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when? *Will this work? (Option A)* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd1.unpersist() Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right? *Should I do it this way instead (Option B)?* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) rdd1.unpersist() *So the question is this:* Is Option A good enough? e.g. will rdd1 be still accessing the file only once? Or do I need to go with Option B? (see also http://stackoverflow.com/questions/29903675/understanding-sparks-caching) Thanks in advance
Re: How to add jars to standalone pyspark program
ah, just noticed that you are using an external package, you can add that like this conf = (SparkConf().set(spark.jars, jar_path)) or if it is a python package: sc.addPyFile() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685p22688.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
Worked now. On Mon, Apr 27, 2015 at 10:20 PM, Sean Owen so...@cloudera.com wrote: Works fine for me. Make sure you're not downloading the HTML redirector page and thinking it's the archive. On Mon, Apr 27, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak -- Deepak
Re: How to add jars to standalone pyspark program
Hi Mark, That does not look like an python path issue, spark-assembly jar should have those packaged, and should make it available for the workers. Have you built the jar yourself? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685p22687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization error
arguments are values of it. The name of the argument is important and all you need to do is specify those when your creating SparkConf object. Glad it worked. On Tue, Apr 28, 2015 at 5:20 PM, madhvi madhvi.gu...@orkash.com wrote: Thankyou Deepak.It worked. Madhvi On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize ).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get( maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize ).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak -- Deepak
Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...
Can you simply apply the https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter to this? You should be able to do something like this: val stats = RDD.map(x = x._2).stats() -Todd On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io subscripti...@prismalytics.io wrote: Hello Friends: I generated a Pair RDD with K/V pairs, like so: rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.246), (u'2013-10-13', 10.693069306930692)] Now from the above RDD, I would like to calculate an average of the VALUES for each KEY. I can do so as shown here, which does work: countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the SUM). rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] But I wonder if the above semantics/approach is the optimal one; or whether perhaps there is a single API call that handles common use case. Improvement thoughts welcome. =:) Thank you, nmv
Single stream with series of transformations
Hi, I'm following the pattern of filtering data by a certain criteria, and then saving the results to a different table. The code below illustrates the idea. The simple integration test I wrote suggests it works, simply asserting filtered data should be in their respective tables after being filtered. I could be adding more filters in this stream in the future. Is this a bad idea? public void run() { JavaPairReceiverInputDStreamlt;String, Rawgt; stream = KafkaUtils.createStream(...);//Save to Master Table stream.map((rawTuple) -gt; { return rawTuple._2; }).foreach((rawRDD) -gt; { javaFunctions(rawRDD).writerBuilder(keyspace, rawTable, mapToRow(Raw.class)).saveToCassandra(); return null; }); //Process stream again, filter by a criteria, and save to a different table stream.map((rawTuple) -gt; { return rawTuple._2; }).filter((raw) -gt; { //Filter by some criteria }).map((raw) -gt; { return new Criteria1(raw.getSomeField()); }).foreachRDD((rdd) -gt; { javaFunctions(rdd).writerBuilder(keyspace, criteria1Table, mapToRow(Criteria1.class)).saveToCassandra(); return null; }); //Process stream for 3rd time, by a different criteria, and then save to a different table stream.map((rawTuple) -gt; { return rawTuple._2; }).filter((raw) -gt; {//filter by another criteria }).map((raw) -gt; { return new Criteria2(raw.getOtherField()); }).foreachRDD((rdd) -gt; { javaFunctions(rdd).writerBuilder(keyspace, criteria2Table, mapToRow(Criteria2.class)).saveToCassandra(); return null; });streamingContext.start();}} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Single-stream-with-series-of-transformations-tp22689.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark partitioning question
Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement. I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Marius, What’s the expected output? I would recommend avoiding the groupByKey if possible since it’s going to force all records for each key to go to an executor which may overload it. Also if you need to sort and repartition, try using repartitionAndSortWithinPartitions to do it in one shot. Thanks, Silvio From: Marius Danciu Date: Tuesday, April 28, 2015 at 8:10 AM To: user Subject: Spark partitioning question Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Scalability of group by
Hi, I can offer a few ideas to investigate in regards to your issue here. I've run into resource issues doing shuffle operations with a much smaller dataset than 2B. The data is going to be saved to disk by the BlockManager as part of the shuffle and then redistributed across the cluster as relevant to the group by. So the data is going to be replicated during the operation. I might suggest trying to allocate more memory for your executors in your cluster. You might also want to look into configuring more explicitly the shuffle functionality ( https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior). Check the disk usage on the worker nodes, in our case we actually had small disk space to start with and were running out of temporary space for the shuffle operation. I believe you should also be able to find more clear errors in logs from the worker nodes if you haven't checked yet. On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: It works on a smaller dataset of 100 rows. Probably I could find the size when it fails using binary search. However, it would not help me because I need to work with 2B rows. *From:* ayan guha [mailto:guha.a...@gmail.com] *Sent:* Monday, April 27, 2015 6:58 PM *To:* Ulanov, Alexander *Cc:* user@spark.apache.org *Subject:* Re: Scalability of group by Hi Can you test on a smaller dataset to identify if it is cluster issue or scaling issue in spark On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in Spark 1.3 as follows: “select id, time, first(value) from data group by id, time” My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is allocated with 5GB of memory. However, all executors are being lost during the query execution and I get “ExecutorLostFailure”. Could you suggest what might be the reason for it? Could it be that “group by” is implemented as RDD.groupBy so it holds the group by result in memory? What is the workaround? Best regards, Alexander
Calculating the averages for each KEY in a Pairwise (K,V) RDD ...
Hello Friends: I generated a Pair RDD with K/V pairs, like so: rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.246), (u'2013-10-13', 10.693069306930692)] Now from the above RDD, I would like to calculate an average of the VALUES for each KEY. I can do so as shown here, which does work: * ***countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the SUM). rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] But I wonder if the above semantics/approach is the optimal one; or whether perhaps there is a single API call that handles common use case. Improvement thoughts welcome. =:) Thank you, nmv
Re: A problem of using spark streaming to capture network packets
Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with *sudo*, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you u]Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets -- It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com* wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! * (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org* user-unsubscr...@spark.apache.org For additional commands, e-mail: *user-h...@spark.apache.org* user-h...@spark.apache.org
Question about Memory Used and VCores Used
Hi,guys, I have the following computation with 3 workers: spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 1g -e 'select count(*) from table' The resources used are shown as below on the UI: I don't understand why the memory used is 15GB and vcores used is 5. I think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores used shoulde be executor-cores*numOfWorkers=6 Can you please explain the result?Thanks. bit1...@163.com
Re: HBase HTable constructor hangs
I am exactly having same issue. I am running hbase and spark in docker container. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HBase-HTable-constructor-hangs-tp4926p22696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib SVMWithSGD is failing for large dataset
Hi Sarath, It might be questionable to set num-executors as 64 if you only has 8 nodes. Do you use any action like collect which will overwhelm the driver since you have a large dataset? Thanks On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com wrote: I am trying to train a large dataset consisting of 8 million data points and 20 million features using SVMWithSGD. But it is failing after running for some time. I tried increasing num-partitions, driver-memory, executor-memory, driver-max-resultSize. Also I tried by reducing the size of dataset from 8 million to 25K (keeping number of features same 20 M). But after using the entire 64GB driver memory for 20 to 30 min it failed. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). executor-memory - 60G driver-memory - 60G num-executors - 64 And other default settings This is the error log : 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is closed 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused: xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at
RE: Why Spark is much faster than Hadoop MapReduce even on disk
One reason Spark on disk is faster than MapReduce is Spark’s advanced Directed Acyclic Graph (DAG) engine. MapReduce will require a complex job to be split into multiple Map-Reduce jobs, with disk I/O at the end of each job and beginning of a new job. With Spark, you may be able to express the same job with fewer number of stages, invoking fewer disk I/O. Disk I/O is an expensive operation, so fewer disk I/O operation translates to better performance. Mohammed From: Ilya Ganelin [mailto:ilgan...@gmail.com] Sent: Monday, April 27, 2015 7:55 PM To: bit1...@163.com; user Subject: Re: Why Spark is much faster than Hadoop MapReduce even on disk I believe the typical answer is that Spark is actually a bit slower. On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.commailto:bit1...@163.com bit1...@163.commailto:bit1...@163.com wrote: Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: Why Spark is much faster than Hadoop MapReduce even on disk
our experience is that unless you can benefit from spark features such as co-partitioning that allow for more efficient execution that spark is slightly slower for disk to disk. On Apr 27, 2015 10:34 PM, bit1...@163.com bit1...@163.com wrote: Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks! --
Re: A problem of using spark streaming to capture network packets
1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with sudo, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets From: Dean Wampler deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject:Re: A problem of using spark streaming to capture network packets It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java) Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming - textFileStream/fileStream - Get file name
I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HBase HTable constructor hangs
Can you give us more information ? Such as hbase release, Spark release. If you can pastebin jstack of the hanging HTable process, that would help. BTW I used http://search-hadoop.com/?q=spark+HBase+HTable+constructor+hangs and saw a very old thread with this subject. Cheers On Tue, Apr 28, 2015 at 7:12 PM, tridib tridib.sama...@live.com wrote: I am exactly having same issue. I am running hbase and spark in docker container. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HBase-HTable-constructor-hangs-tp4926p22696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A problem of using spark streaming to capture network packets
btw, from spark web ui, the acl is marked with root Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets From: Dean Wampler deanwamp...@gmail.com To: Lin Hao Xu/China/IBM@IBMCN Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org Date: 2015/04/29 09:40 Subject:Re: A problem of using spark streaming to capture network packets Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with sudo, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you uDean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java) Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization error
Thankyou Deepak.It worked. Madhvi On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Deepak
Re: 1.3.1: Persisting RDD in parquet - Conflicting partition column names
Can you show your code please? On 28 Apr 2015 13:20, sranga sra...@gmail.com wrote: Hi I am getting the following error when persisting an RDD in parquet format to an S3 location. This is code that was working in the 1.2 version. The version that it is failing to work is 1.3.1. Any help is appreciated. Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected: ArrayBuffer(batch_id) ArrayBuffer() at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.parquet.ParquetRelation2$.resolvePartitions(newParquet.scala:933) at org.apache.spark.sql.parquet.ParquetRelation2$.parsePartitions(newParquet.scala:851) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:311) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:303) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:303) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:692) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:995) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-3-1-Persisting-RDD-in-parquet-Conflicting-partition-column-names-tp22678.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to add jars to standalone pyspark program
Can you specifiy 'running via PyCharm'. how are you executing the script, with spark-submit? In PySpark I guess you used --jars databricks-csv.jar. With spark-submit you might need the additional --driver-class-path databricks-csv.jar. Both parameters cannot be set via the SparkConf object. Cheers, Fabian On 04/28/2015 10:06 AM, mj wrote: Hi, I'm trying to figure out how to use a third party jar inside a python program which I'm running via PyCharm in order to debug it. I am normally able to run spark code in python such as this: spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) cars = sc.textFile('c:/cars.csv') print cars.count() sc.stop() The code I'm trying to run is below - it uses the databricks spark csv jar. I can get it working fine in pyspark shell using the packages argument, but I can't figure out how to get it to work via PyCharm. from pyspark.sql import SQLContext from pyspark import SparkConf, SparkContext spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) sqlContext = SQLContext(sc) df = sqlContext.load(source=com.databricks.spark.csv, header=true, path = c:/cars.csv, delimiter='\t') df.select(year) The error message I'm getting is: py4j.protocol.Py4JJavaError: An error occurred while calling o20.load. : java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I presume I need to set the spark classpath somehow but I'm not sure of the right way to do it. Any advice/guidance would be appreciated. Thanks, Mark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A problem of using spark streaming to capture network packets
It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! *(See attached file: PcapReceiver.java)**(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: submitting to multiple masters
According to the docs it should go like this: spark://host1:port1,host2:port2 https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper Thanks M On Apr 28, 2015, at 8:13 AM, James King jakwebin...@gmail.com wrote: I have multiple masters running and I'm trying to submit an application using spark-1.3.0-bin-hadoop2.4/bin/spark-submit with this config (i.e. a comma separated list of master urls) --master spark://master01:7077,spark://master02:7077 But getting this exception Exception in thread main org.apache.spark.SparkException: Invalid master URL: spark://spark://master02:7077 What am I doing wrong? Many Thanks jk
Spark partitioning question
Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: How to add jars to standalone pyspark program
Its a windows thing. Please escape front slash in string. Basically it is not able to find the file On 28 Apr 2015 22:09, Fabian Böhnlein fabian.boehnl...@gmail.com wrote: Can you specifiy 'running via PyCharm'. how are you executing the script, with spark-submit? In PySpark I guess you used --jars databricks-csv.jar. With spark-submit you might need the additional --driver-class-path databricks-csv.jar. Both parameters cannot be set via the SparkConf object. Cheers, Fabian On 04/28/2015 10:06 AM, mj wrote: Hi, I'm trying to figure out how to use a third party jar inside a python program which I'm running via PyCharm in order to debug it. I am normally able to run spark code in python such as this: spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) cars = sc.textFile('c:/cars.csv') print cars.count() sc.stop() The code I'm trying to run is below - it uses the databricks spark csv jar. I can get it working fine in pyspark shell using the packages argument, but I can't figure out how to get it to work via PyCharm. from pyspark.sql import SQLContext from pyspark import SparkConf, SparkContext spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) sqlContext = SQLContext(sc) df = sqlContext.load(source=com.databricks.spark.csv, header=true, path = c:/cars.csv, delimiter='\t') df.select(year) The error message I'm getting is: py4j.protocol.Py4JJavaError: An error occurred while calling o20.load. : java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I presume I need to set the spark classpath somehow but I'm not sure of the right way to do it. Any advice/guidance would be appreciated. Thanks, Mark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StandardScaler failing with OOM errors in PySpark
That's exactly what I'm saying -- I specify the memory options using spark options, but this is not reflected in how the JVM is created. No matter which memory settings I specify, the JVM for the driver is always made with 512Mb of memory. So I'm not sure if this is a feature or a bug? rok On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote: You might need to specify driver memory in spark-submit instead of passing JVM options. spark-submit is designed to handle different deployments correctly. -Xiangrui On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote: ok yes, I think I have narrowed it down to being a problem with driver memory settings. It looks like the application master/driver is not being launched with the settings specified: For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m -Xmx512m as options used to start the JVM, even though I specified 'spark.yarn.am.memory', '5g' 'spark.yarn.am.memoryOverhead', '2000' The info shows that these options were read: 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB memory including 2000 MB overhead Is there some reason why these options are being ignored and instead starting the driver with just 512Mb of heap? On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote: the feature dimension is 800k. yes, I believe the driver memory is likely the problem since it doesn't crash until the very last part of the tree aggregation. I'm running it via pyspark through YARN -- I have to run in client mode so I can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory and overhead parameters but it doesn't seem to have an effect. Thanks, Rok On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote: What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-defaults.conf
So no takers regarding why spark-defaults.conf is not being picked up. Here is another one: If Zookeeper is configured in Spark why do we need to start a slave like this: spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh 1 spark://somemaster:7077 i.e. why do we need to specify the master url explicitly Shouldn't Spark just consult with ZK and us the active master? Or is ZK only used during failure? On Mon, Apr 27, 2015 at 1:53 PM, James King jakwebin...@gmail.com wrote: Thanks. I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile But when I start worker like this spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh I still get failed to launch org.apache.spark.deploy.worker.Worker: Default is conf/spark-defaults.conf. 15/04/27 11:51:33 DEBUG Utils: Shutdown hook called On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara zoltan.zv...@gmail.com wrote: You should distribute your configuration file to workers and set the appropriate environment variables, like HADOOP_HOME, SPARK_HOME, HADOOP_CONF_DIR, SPARK_CONF_DIR. On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote: I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
submitting to multiple masters
I have multiple masters running and I'm trying to submit an application using spark-1.3.0-bin-hadoop2.4/bin/spark-submit with this config (i.e. a comma separated list of master urls) --master spark://master01:7077,spark://master02:7077 But getting this exception Exception in thread main org.apache.spark.SparkException: Invalid master URL: spark://spark://master02:7077 What am I doing wrong? Many Thanks jk
Re: MLLib SVMWithSGD is failing for large dataset
Hi, I'm just calling the standard SVMWithSGD implementation of Spark's MLLib. I'm not using any method like collect. Thanks, Sarath On Tue, Apr 28, 2015 at 4:35 PM, ai he heai0...@gmail.com wrote: Hi Sarath, It might be questionable to set num-executors as 64 if you only has 8 nodes. Do you use any action like collect which will overwhelm the driver since you have a large dataset? Thanks On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com wrote: I am trying to train a large dataset consisting of 8 million data points and 20 million features using SVMWithSGD. But it is failing after running for some time. I tried increasing num-partitions, driver-memory, executor-memory, driver-max-resultSize. Also I tried by reducing the size of dataset from 8 million to 25K (keeping number of features same 20 M). But after using the entire 64GB driver memory for 20 to 30 min it failed. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). executor-memory - 60G driver-memory - 60G num-executors - 64 And other default settings This is the error log : 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is closed 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
Re: A problem of using spark streaming to capture network packets
Actually, to simplify this problem, we run our program on a single machine with 4 slave workers. Since on a single machine, I think all slave workers are ran with root privilege. BTW, if we have a cluster, how to make sure slaves on remote machines run program as root? Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets From: Dean Wampler deanwamp...@gmail.com To: Lin Hao Xu/China/IBM@IBMCN Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org Date: 2015/04/29 09:40 Subject:Re: A problem of using spark streaming to capture network packets Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with sudo, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you uDean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java) Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practices on testing Spark jobs
Hi, I have two questions regarding testing Spark jobs: 1. Is it possible to use Mockito for that purpose? I tried to use it, but it looks like there are no interactions with mocks. I didn't dive into the details of how Mockito works, but I guess it might be because of the serialization and how Spark distributes tasks. I'm not sure about it though and I'm looking for confirmation. 2. If not mockito, what's the alternative? What's the recommended way to test Spark jobs? Should I manually create mocks by e.g. extending all the classes I'd normally mock and changing the implementation of some methods? I don't like this idea but I can't really see any other options now. Kind regards, Michał Michalski, michal.michal...@boxever.com
Re: hive-thriftserver maven artifact
Credit goes to Misha Chernetsov (see SPARK-4925) FYI On Tue, Apr 28, 2015 at 8:25 AM, Marco marco@gmail.com wrote: Thx Ted for the info ! 2015-04-27 23:51 GMT+02:00 Ted Yu yuzhih...@gmail.com: This is available for 1.3.1: http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10 FYI On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote: Ok, so will it be only available for the next version (1.30)? 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com: I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco -- Viele Grüße, Marco -- Viele Grüße, Marco
Re: Best practices on testing Spark jobs
Hi, Can you give some tutorials/examples how to write test case based on the mentioned framework? Thanks, Sourav On Tue, Apr 28, 2015 at 9:22 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Sorry that’s correct, I was thinking you were maybe trying to mock certain aspects of Spark core to write your tests. This is a library to help write unit tests by managing the SparkContext and StreamingContext. So you can test your transformations as necessary. More importantly on the streaming side it really helps simplify running tests on batch outputs. If you’re having serialization issues you may need to look at using transient lazy initializers, to see if that helps? From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:42 AM To: Silvio Fiorito Cc: user Subject: Re: Best practices on testing Spark jobs Thanks Silvio. I might be missing something, but it looks like this project is a kind of a framework for setting up Spark for a testing, but after taking a quick look at the code it doesn't seem like it's solving the problem with mocking which is my main concern now - am I wrong? Kind regards, Michał Michalski, michal.michal...@boxever.com On 28 April 2015 at 16:35, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Michal, Please try spark-testing-base by Holden. I’ve used it and it works well for unit testing batch and streaming jobs https://github.com/holdenk/spark-testing-base Thanks, Silvio From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:32 AM To: user Subject: Best practices on testing Spark jobs Hi, I have two questions regarding testing Spark jobs: 1. Is it possible to use Mockito for that purpose? I tried to use it, but it looks like there are no interactions with mocks. I didn't dive into the details of how Mockito works, but I guess it might be because of the serialization and how Spark distributes tasks. I'm not sure about it though and I'm looking for confirmation. 2. If not mockito, what's the alternative? What's the recommended way to test Spark jobs? Should I manually create mocks by e.g. extending all the classes I'd normally mock and changing the implementation of some methods? I don't like this idea but I can't really see any other options now. Kind regards, Michał Michalski, michal.michal...@boxever.com -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Best practices on testing Spark jobs
Sorry that’s correct, I was thinking you were maybe trying to mock certain aspects of Spark core to write your tests. This is a library to help write unit tests by managing the SparkContext and StreamingContext. So you can test your transformations as necessary. More importantly on the streaming side it really helps simplify running tests on batch outputs. If you’re having serialization issues you may need to look at using transient lazy initializers, to see if that helps? From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:42 AM To: Silvio Fiorito Cc: user Subject: Re: Best practices on testing Spark jobs Thanks Silvio. I might be missing something, but it looks like this project is a kind of a framework for setting up Spark for a testing, but after taking a quick look at the code it doesn't seem like it's solving the problem with mocking which is my main concern now - am I wrong? Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 28 April 2015 at 16:35, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: Hi Michal, Please try spark-testing-base by Holden. I’ve used it and it works well for unit testing batch and streaming jobs https://github.com/holdenk/spark-testing-base Thanks, Silvio From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:32 AM To: user Subject: Best practices on testing Spark jobs Hi, I have two questions regarding testing Spark jobs: 1. Is it possible to use Mockito for that purpose? I tried to use it, but it looks like there are no interactions with mocks. I didn't dive into the details of how Mockito works, but I guess it might be because of the serialization and how Spark distributes tasks. I'm not sure about it though and I'm looking for confirmation. 2. If not mockito, what's the alternative? What's the recommended way to test Spark jobs? Should I manually create mocks by e.g. extending all the classes I'd normally mock and changing the implementation of some methods? I don't like this idea but I can't really see any other options now. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com
Re: Spark partitioning question
So the other issue could due to the fact that using mapPartitions after the partitionBy, you essentially lose the partitioning of the keys since Spark assumes the keys were altered in the map phase. So really the partitionBy gets lost after the mapPartitions, that’s why you need to do it again. From: Marius Danciu Date: Tuesday, April 28, 2015 at 9:53 AM To: Silvio Fiorito, user Subject: Re: Spark partitioning question Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement. I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: Hi Marius, What’s the expected output? I would recommend avoiding the groupByKey if possible since it’s going to force all records for each key to go to an executor which may overload it. Also if you need to sort and repartition, try using repartitionAndSortWithinPartitions to do it in one shot. Thanks, Silvio From: Marius Danciu Date: Tuesday, April 28, 2015 at 8:10 AM To: user Subject: Spark partitioning question Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( f ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and sometimes the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...
If you need to keep the keys, you can use aggregateByKey to calculate an avg of the values: val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 1), (a, b) = (a._1 + b._1, a._2 + b._2)) val avgByKey = step1.mapValues(i = i._1/i._2) Essentially, what this is doing is passing an initializer for sum and count, then summing each pair of values and counting the number of values. The last argument is to combine the results of each partition, if the data was spread across partitions. The result is a tuple of sum and count for each key. Use mapValues to keep your partitioning by keys intact and minimize a full shuffle for downstream keyed operations. It just calculates the avg for each key. From: Todd Nist Date: Tuesday, April 28, 2015 at 10:20 AM To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ... Can you simply apply the https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter to this? You should be able to do something like this: val stats = RDD.map(x = x._2).stats() -Todd On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io wrote: Hello Friends: I generated a Pair RDD with K/V pairs, like so: rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.246), (u'2013-10-13', 10.693069306930692)] Now from the above RDD, I would like to calculate an average of the VALUES for each KEY. I can do so as shown here, which does work: countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the SUM). rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] But I wonder if the above semantics/approach is the optimal one; or whether perhaps there is a single API call that handles common use case. Improvement thoughts welcome. =:) Thank you, nmv
Re: hive-thriftserver maven artifact
Thx Ted for the info ! 2015-04-27 23:51 GMT+02:00 Ted Yu yuzhih...@gmail.com: This is available for 1.3.1: http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10 FYI On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote: Ok, so will it be only available for the next version (1.30)? 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com: I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco -- Viele Grüße, Marco -- Viele Grüße, Marco
Re: Best practices on testing Spark jobs
Hi Michal, Please try spark-testing-base by Holden. I’ve used it and it works well for unit testing batch and streaming jobs https://github.com/holdenk/spark-testing-base Thanks, Silvio From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:32 AM To: user Subject: Best practices on testing Spark jobs Hi, I have two questions regarding testing Spark jobs: 1. Is it possible to use Mockito for that purpose? I tried to use it, but it looks like there are no interactions with mocks. I didn't dive into the details of how Mockito works, but I guess it might be because of the serialization and how Spark distributes tasks. I'm not sure about it though and I'm looking for confirmation. 2. If not mockito, what's the alternative? What's the recommended way to test Spark jobs? Should I manually create mocks by e.g. extending all the classes I'd normally mock and changing the implementation of some methods? I don't like this idea but I can't really see any other options now. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:10 *To:* lokeshkumar lok...@dataken.net *CC:* spark users user@spark.apache.org *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: HBase HTable constructor hangs
I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0. Here is the jstack trace. Complete stack trace attached. Executor task launch worker-1 #58 daemon prio=5 os_prio=0 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152) - locked 0xf8cb7258 (a org.apache.hadoop.hbase.client.RpcRetryingCaller) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) - locked 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Executor task launch worker-0 #57 daemon prio=5 os_prio=0 tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry [0x7fd4506d8000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156) - waiting to lock 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Date: Tue, 28 Apr 2015 19:35:26 -0700 Subject: Re: HBase HTable constructor hangs From: yuzhih...@gmail.com To: tridib.sama...@live.com CC: user@spark.apache.org Can you give us more information ?Such as hbase release, Spark release. If you can pastebin jstack of the hanging HTable process, that would help. BTW I used
How to stream all data out of a Kafka topic once, then terminate job?
Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:10 *To:* lokeshkumar lok...@dataken.net *CC:* spark users user@spark.apache.org *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Weird error/exception
I was having this issue when my batch interval was very big -- like 5 minutes. When my batch interval is smaller, I don't get this exception. Can someone explain to me why this might be happening? Vadim ᐧ On Tue, Apr 28, 2015 at 4:26 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming to monitor an S3 bucket. Everything appears to be fine. But every batch interval I get the following: *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to release HttpMethod in finalize() as its response data stream has gone out of scope. This attempt will not always succeed and cannot be relied upon! Please ensure response data streams are always fully consumed or closed to avoid HTTP connection starvation.* *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully released HttpMethod in finalize(). You were lucky this time... Please ensure response data streams are always fully consumed or closed.* *Traceback (most recent call last):* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162, in manager* *code = worker(sock)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 60, in worker* *worker_main(infile, outfile)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py, line 126, in main* *if read_int(infile) == SpecialLengths.END_OF_STREAM:* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line 528, in read_int* *raise EOFError* *EOFError* Does anyone know the cause of this and how to fix it? Thanks, Vadim ᐧ
External Application Run Status
Hi In a multi-node setup, I am invoking a number external apps, through Runtime.getRuntime.exec from rdd.map function, and would like to track their completion status. Evidently, such calls spawn a separate thread, which is not tracked by the standalone scheduler, i.e., reduce or collect are called prior to completion of these tasks. I am wondering if : 1- there is way to delay calls to reduce/collect, till their respective external app threads are completed 2- and if not, is it possible to query the node id and pid of such threads Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: +1 604 647 1527 Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.comhttp://www.cisco.com/ [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 - Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:54 To: Vadim Bichutskiy CC: bit1...@163.com; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HBase HTable constructor hangs
How did you distribute hbase-site.xml to the nodes ? Looks like HConnectionManager couldn't find the hbase:meta server. Cheers On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote: I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0. Here is the jstack trace. Complete stack trace attached. Executor task launch worker-1 #58 daemon prio=5 os_prio=0 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152) - locked 0xf8cb7258 (a org.apache.hadoop.hbase.client.RpcRetryingCaller) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) - locked 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Executor task launch worker-0 #57 daemon prio=5 os_prio=0 tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry [0x7fd4506d8000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156) - waiting to lock 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Date: Tue, 28 Apr 2015 19:35:26 -0700
RE: HBase HTable constructor hangs
I am 100% sure how it's picking up the configuration. I copied the hbase-site.xml in hdfs/spark cluster (single machine). I also included hbase-site.xml in spark-job jar files. spark-job jar file also have yarn-site and mapred-site and core-site.xml in it. One interesting thing is, when I run the spark-job jar as standalone and execute the HBase client from a main method, it works fine. Same client unable to connect/hangs when the jar is distributed in spark. Thanks Tridib Date: Tue, 28 Apr 2015 21:25:41 -0700 Subject: Re: HBase HTable constructor hangs From: yuzhih...@gmail.com To: tridib.sama...@live.com CC: user@spark.apache.org How did you distribute hbase-site.xml to the nodes ? Looks like HConnectionManager couldn't find the hbase:meta server. Cheers On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote: I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0. Here is the jstack trace. Complete stack trace attached. Executor task launch worker-1 #58 daemon prio=5 os_prio=0 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152) - locked 0xf8cb7258 (a org.apache.hadoop.hbase.client.RpcRetryingCaller) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) - locked 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Executor task launch worker-0 #57 daemon prio=5 os_prio=0 tid=0x7fd3d0443800 nid=0x487 waiting for monitor entry [0x7fd4506d8000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1156) - waiting to lock 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at com.mypackage.storeTuples(CubeStoreService.java:59) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:23) at com.mypackage.StorePartitionToHBaseStoreFunction.call(StorePartitionToHBaseStoreFunction.java:13) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) at
How to run self-build spark on EC2?
Hi all, I have an issue. I added some timestamps in Spark source code and built it using: mvn package -DskipTests I checked the new version in my own computer and it works. However, when I ran spark on EC2, the spark code EC2 machines ran is the original version. Anyone knows how to deploy the changed spark source code into EC2? Thx a lot Bo Fu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
solr in spark
Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: solr in spark
Thanks for reply. Elastic search index will be within my Cluster? or I need the separate host the elastic search? On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote: I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: solr in spark
I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: solr in spark
AFAIK Datastax is heavily looking at it. they have a good integration of Cassandra with it. the next was clearly to have a strong combination of the three in one of the coming releases Le mar. 28 avr. 2015 18:28, Jeetendra Gangele gangele...@gmail.com a écrit : Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
PySpark: slicing issue with dataframes
Hi experts, Trying to use the slicing functionality in strings as part of a Spark program (PySpark) I get this error: Code import pandas as pd from pyspark.sql import SQLContext hc = SQLContext(sc) A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname': ['Jones', 'Bajwa', 'Day']}) a = hc.createDataFrame(A) print A b = a.select(a.Firstname[:2]) print b.toPandas() c = a.select(a.Lastname[2:]) print c.toPandas() Output: Firstname Lastname 0 JamesJones 1 AliBajwa 2Daniel Day SUBSTR(Firstname, 0, 2) 0 Ja 1 Al 2 Da --- Py4JError Traceback (most recent call last) ipython-input-17-6ee5d7d069ce in module() 10 b = a.select(a.Firstname[:2]) 11 print b.toPandas() --- 12 c = a.select(a.Lastname[2:]) 13 print c.toPandas() /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self, startPos, length) 1089 raise TypeError(Can not mix the type) 1090 if isinstance(startPos, (int, long)): - 1091 jc = self._jc.substr(startPos, length) 1092 elif isinstance(startPos, Column): 1093 jc = self._jc.substr(startPos._jc, length._jc) /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 302 raise Py4JError( 303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'. -- 304 format(target_id, '.', name, value)) 305 else: 306 raise Py4JError( Py4JError: An error occurred while calling o1887.substr. Trace: py4j.Py4JException: Method substr([class java.lang.Integer, class java.lang.Long]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Looks like X[:2] works but X[2:] fails with the error above Anyone else have this issue? Clearly I can use substr() to workaround this, but if this is a confirmed bug we should open a JIRA. Thanks, Ali
Re: Question regarding join with multiple columns with pyspark
Thanks again Ayan! To close the loop on this issue, I have filed the below JIRA to track the issue: https://issues.apache.org/jira/browse/SPARK-7197 On Fri, Apr 24, 2015 at 8:21 PM, ayan guha guha.a...@gmail.com wrote: I just tested, your observation in DataFrame API is correct. It behaves weirdly in case of multiple column join. (Maybe we should report a Jira?) Solution: You can go back to our good old composite key field concatenation method. Not ideal, but workaround. (Of course you can use realSQL as well, as shown below) set up Data: a = [[1993,1,100],[1993,2,200],[1994,1,1000],[1994,3,3000],[2000,1,1]] b = [[1993,1,A],[1994,1,AA],[2000,1,AAA]] YM1 = sc.parallelize(a).map(lambda tup: Row(yr=int(tup[0]),mn = int(tup[1]), price = int(tup[2]),joiningKey=str(tup[0])+~+str(tup[1]))) YM2 = sc.parallelize(b).map(lambda tup: Row(yr=int(tup[0]),mn = int(tup[1]), name = str(tup[2]),joiningKey=str(tup[0])+~+str(tup[1]))) print YM1.collect() print YM2.collect() YM1DF = ssc.createDataFrame(YM1) YM2DF = ssc.createDataFrame(YM2) print YM1DF.printSchema() print YM2DF.printSchema() This DOES NOT WORK --- YMJN = YM1DF.join(YM2DF,YM1DF.yr==YM2DF.yr and YM1DF.mn==YM2DF.mn,inner) print YMJN.printSchema() for l in YMJN.collect(): print l Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1', mn=1, name=u'A', yr=1993) Row(joiningKey=u'1994~1', mn=1, price=100, yr=1994, joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994) Row(joiningKey=u'2000~1', mn=1, price=100, yr=2000, joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000) Row(joiningKey=u'1993~1', mn=1, price=1000, yr=1993, joiningKey=u'1993~1', mn=1, name=u'A', yr=1993) Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994) Row(joiningKey=u'2000~1', mn=1, price=1000, yr=2000, joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000) Row(joiningKey=u'1993~1', mn=1, price=1, yr=1993, joiningKey=u'1993~1', mn=1, name=u'A', yr=1993) Row(joiningKey=u'1994~1', mn=1, price=1, yr=1994, joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994) Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000, joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000) - SQL Solution - works as expected YM1DF.registerTempTable(ymdf1) YM2DF.registerTempTable(ymdf2) YMJNS = ssc.sql(select * from ymdf1 inner join ymdf2 on ymdf1.yr=ymdf2.yr and ymdf1.mn=ymdf2.mn) print YMJNS.printSchema() for l in YMJNS.collect(): print l Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994) Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000, joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000) Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1', mn=1, name=u'A', yr=1993) - Field concat method, works as well YMJNA = YM1DF.join(YM2DF,YM1DF.joiningKey==YM2DF.joiningKey,inner) print YMJNA.printSchema() for l in YMJNA.collect(): print l Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1', mn=1, name=u'AA', yr=1994) Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1', mn=1, name=u'A', yr=1993) Row(joiningKey=u'2000~1', mn=1, price=1, yr=2000, joiningKey=u'2000~1', mn=1, name=u'AAA', yr=2000) On Sat, Apr 25, 2015 at 10:18 AM, Ali Bajwa ali.ba...@gmail.com wrote: Any ideas on this? Any sample code to join 2 data frames on two columns? Thanks Ali On Apr 23, 2015, at 1:05 PM, Ali Bajwa ali.ba...@gmail.com wrote: Hi experts, Sorry if this is a n00b question or has already been answered... Am trying to use the data frames API in python to join 2 dataframes with more than 1 column. The example I've seen in the documentation only shows a single column - so I tried this: Example code import pandas as pd from pyspark.sql import SQLContext hc = SQLContext(sc) A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5', '12', '12'], 'value': [100, 200, 300]}) a = hc.createDataFrame(A) B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'], 'value': [101, 102]}) b = hc.createDataFrame(B) print Pandas # try with Pandas print A print B print pd.merge(A, B, on=['year', 'month'], how='inner') print Spark print a.toPandas() print b.toPandas() print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas() *Output Pandas month value year 0 5100 1993 112200 2005 212300 1994 month value year 012101 1993 112102 1993 Empty DataFrame Columns: [month, value_x, year, value_y] Index: [] Spark month value year 0 5100 1993
Re: How to deploy self-build spark source code on EC2
[-dev] [+user] This is a question for the user list, not the dev list. Use the --spark-version and --spark-git-repo options to specify your own repo and hash to deploy. Source code link. https://github.com/apache/spark/blob/268c419f1586110b90e68f98cd000a782d18828c/ec2/spark_ec2.py#L189-L195 Nick On Tue, Apr 28, 2015 at 12:14 PM Bo Fu b...@uchicago.edu http://mailto:b...@uchicago.edu wrote: Hi all, I have an issue. I added some timestamps in Spark source code and built it using: mvn package -DskipTests I checked the new version in my own computer and it works. However, when I ran spark on EC2, the spark code EC2 machines ran is the original version. Anyone knows how to deploy the changed spark source code into EC2? Thx a lot Bo Fu - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Spark - Timeout Issues - OutOfMemoryError
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size) and it takes 16 executors to do so. I wanted to run it against 10 files of each input type (10*3 files as there are three inputs that are transformed). [Input1 = 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. I see multiple 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 1 more When i searched deeper, i found OOM error. 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(17, phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent heart beats: 121200ms exceeds 12ms 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com, 48304) 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2245) at java.util.Arrays.copyOf(Arrays.java:2219) at java.util.ArrayList.grow(ArrayList.java:242) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) at java.util.ArrayList.add(ArrayList.java:440) at com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at
Spark streaming - textFileStream/fileStream - Get file name
Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Weird error/exception
I am using Spark Streaming to monitor an S3 bucket. Everything appears to be fine. But every batch interval I get the following: *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to release HttpMethod in finalize() as its response data stream has gone out of scope. This attempt will not always succeed and cannot be relied upon! Please ensure response data streams are always fully consumed or closed to avoid HTTP connection starvation.* *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully released HttpMethod in finalize(). You were lucky this time... Please ensure response data streams are always fully consumed or closed.* *Traceback (most recent call last):* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162, in manager* *code = worker(sock)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 60, in worker* *worker_main(infile, outfile)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py, line 126, in main* *if read_int(infile) == SpecialLengths.END_OF_STREAM:* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line 528, in read_int* *raise EOFError* *EOFError* Does anyone know the cause of this and how to fix it? Thanks, Vadim ᐧ
RE: Scalability of group by
Richard, The same problem is with sort. I have enough disk space and tmp folder. The errors in logs tell out of memory. I wonder what does it hold in memory? Alexander From: Richard Marscher [mailto:rmarsc...@localytics.com] Sent: Tuesday, April 28, 2015 7:34 AM To: Ulanov, Alexander Cc: user@spark.apache.org Subject: Re: Scalability of group by Hi, I can offer a few ideas to investigate in regards to your issue here. I've run into resource issues doing shuffle operations with a much smaller dataset than 2B. The data is going to be saved to disk by the BlockManager as part of the shuffle and then redistributed across the cluster as relevant to the group by. So the data is going to be replicated during the operation. I might suggest trying to allocate more memory for your executors in your cluster. You might also want to look into configuring more explicitly the shuffle functionality (https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior). Check the disk usage on the worker nodes, in our case we actually had small disk space to start with and were running out of temporary space for the shuffle operation. I believe you should also be able to find more clear errors in logs from the worker nodes if you haven't checked yet. On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: It works on a smaller dataset of 100 rows. Probably I could find the size when it fails using binary search. However, it would not help me because I need to work with 2B rows. From: ayan guha [mailto:guha.a...@gmail.commailto:guha.a...@gmail.com] Sent: Monday, April 27, 2015 6:58 PM To: Ulanov, Alexander Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Scalability of group by Hi Can you test on a smaller dataset to identify if it is cluster issue or scaling issue in spark On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi, I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in Spark 1.3 as follows: “select id, time, first(value) from data group by id, time” My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is allocated with 5GB of memory. However, all executors are being lost during the query execution and I get “ExecutorLostFailure”. Could you suggest what might be the reason for it? Could it be that “group by” is implemented as RDD.groupBy so it holds the group by result in memory? What is the workaround? Best regards, Alexander
Re: Spark SQL 1.3.1 saveAsParquetFile will output tachyon file with different block size
Hi, You can apply this patch https://github.com/apache/spark/pull/5354 and recompile. Hope this helps, Calvin On Tue, Apr 28, 2015 at 1:19 PM, sara mustafa eng.sara.must...@gmail.com wrote: Hi Zhang, How did you compile Spark 1.3.1 with Tachyon? when i changed Tachyon version to 0.6.3 in core/pom.xml, make-distribution.sh and try to compile again, many compilation errors raised. Thanks, -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-1-3-1-saveAsParquetFile-will-output-tachyon-file-with-different-block-size-tp11561p11870.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Initial tasks in job take time
yes On 29 Apr 2015 03:31, ayan guha guha.a...@gmail.com wrote: Are your driver running on the same m/c as master? On 29 Apr 2015 03:59, Anshul Singhle ans...@betaglide.com wrote: Hi, I'm running short spark jobs on rdds cached in memory. I'm also using a long running job context. I want to be able to complete my jobs (on the cached rdd) in under 1 sec. I'm getting the following job times with about 15 GB of data distributed across 6 nodes. Each executor has about 20GB of memory available. My context has about 26 cores in total. If number of partitions no of cores - Some jobs run in 3s others take about 6s -- the time difference can be explained by GC time. If number of partitions = no of cores - All jobs run in 4s. The initial tasks of each stage on every executor take about 1s. If partitions cores - Jobs take more time. The initial tasks of each stage on every executor take about 1s. The other tasks run in 45-50 ms each. However, since the initial tasks again take about 1s each, the total time in this case is about 6s which is more than the previous case. Clearly the limiting factor here is the initial set of tasks. For every case, these tasks take 1s to run, no matter the amount of partitions. Hence best results are obtained with partitions = cores, because in that case. every core gets 1 task which takes 1s to run. In this case, I get 0 GC time. The only explanation is scheduling delay which is about 0.2 - 0.3 seconds. I looked at my task size and result size and that has no bearing on this delay. Also, I'm not getting the task size warnings in the logs. For what I can understand, the first time a task runs on a core, it takes 1s to run. Is this normal? Is it possible to get sub-second latencies? Can something be done about the scheduler delay? What other things can I look at to reduce this time? Regards, Anshul
rdd.count with 100 elements taking 1 second to run
Hi, I'm running the following code in my cluster (standalone mode) via spark shell - val rdd = sc.parallelize(1 to 100) rdd.count This takes around 1.2s to run. Is this expected or am I configuring something wrong? I'm using about 30 cores with 512MB executor memory As expected, GC time is negligible. I'm just getting some scheduler delay and 1s to launch the task Thanks, Anshul
Re: Initial tasks in job take time
Are your driver running on the same m/c as master? On 29 Apr 2015 03:59, Anshul Singhle ans...@betaglide.com wrote: Hi, I'm running short spark jobs on rdds cached in memory. I'm also using a long running job context. I want to be able to complete my jobs (on the cached rdd) in under 1 sec. I'm getting the following job times with about 15 GB of data distributed across 6 nodes. Each executor has about 20GB of memory available. My context has about 26 cores in total. If number of partitions no of cores - Some jobs run in 3s others take about 6s -- the time difference can be explained by GC time. If number of partitions = no of cores - All jobs run in 4s. The initial tasks of each stage on every executor take about 1s. If partitions cores - Jobs take more time. The initial tasks of each stage on every executor take about 1s. The other tasks run in 45-50 ms each. However, since the initial tasks again take about 1s each, the total time in this case is about 6s which is more than the previous case. Clearly the limiting factor here is the initial set of tasks. For every case, these tasks take 1s to run, no matter the amount of partitions. Hence best results are obtained with partitions = cores, because in that case. every core gets 1 task which takes 1s to run. In this case, I get 0 GC time. The only explanation is scheduling delay which is about 0.2 - 0.3 seconds. I looked at my task size and result size and that has no bearing on this delay. Also, I'm not getting the task size warnings in the logs. For what I can understand, the first time a task runs on a core, it takes 1s to run. Is this normal? Is it possible to get sub-second latencies? Can something be done about the scheduler delay? What other things can I look at to reduce this time? Regards, Anshul
Metric collection
Hi, I would like to collect some metrics from spark and plot them with graphite. I managed to do that withe the metrics provided by the or.apache.park.metrics.source.JvmSource but I would like to know if there are other sources available beside this one. Best, Giovanni
Spark Sql: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi all. I was launching a spark sql job on my own machine, not on the spark cluster machines, and failed. The excpetion info is: 15/04/28 16:28:04 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient) Exception in thread Driver java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveMetastoreCatalog.init(HiveMetastoreCatalog.scala:55) at org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext$$anon$4.init(HiveContext.scala:263) at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:263) at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:262) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at com.nd.huayuedu.DoExecute$.doExecute(DoExecute.scala:16) at com.nd.huayuedu.HiveFromSpark$.main(HiveFromSpark.scala:30) at com.nd.huayuedu.HiveFromSpark.main(HiveFromSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:441) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 27 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 32 more Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at
[Spark SQL] Problems creating a table in specified schema/database
Hey all, I'm trying to create tables from existing Parquet data in different schemata. The following isn't working for me: CREATE DATABASE foo; CREATE TABLE foo.bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0) I also tried USE foo; CREATE TABLE bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Creates the table successfully, but in the default.* schema. This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions? Should this work? James.
Re: New JIRA - [SQL] Can't remove columns from DataFrame or save DataFrame from a join due to duplicate columns
Alias function not in python yet. I suggest to write SQL if your data suits it On 28 Apr 2015 14:42, Don Drake dondr...@gmail.com wrote: https://issues.apache.org/jira/browse/SPARK-7182 Can anyone suggest a workaround for the above issue? Thanks. -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/
Re: Serialization error
val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize ).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get( maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak
Re: solr in spark
Depends on your use case and search volume. Typically you'd have a dedicated ES cluster if your app is doing a lot of real time indexing and search. If it's only for spark integration then you could colocate ES and spark — Sent from Mailbox On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Elastic search index will be within my Cluster? or I need the separate host the elastic search? On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote: I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: JAVA_HOME problem
Are you using a Spark build that matches your YARN cluster version? That seems like it could happen if you're using a Spark built against a newer version of YARN than you're running. On Thu, Apr 2, 2015 at 12:53 AM, 董帅阳 917361...@qq.com wrote: spark 1.3.0 spark@pc-zjqdyyn1:~ tail /etc/profile export JAVA_HOME=/usr/jdk64/jdk1.7.0_45 export PATH=$PATH:$JAVA_HOME/bin # # End of /etc/profile # But ERROR LOG Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454 LogType: stderr LogLength: 61 Log Contents: /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory LogType: stdout LogLength: 0 Log Contents: -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Initial tasks in job take time
Hi, I'm running short spark jobs on rdds cached in memory. I'm also using a long running job context. I want to be able to complete my jobs (on the cached rdd) in under 1 sec. I'm getting the following job times with about 15 GB of data distributed across 6 nodes. Each executor has about 20GB of memory available. My context has about 26 cores in total. If number of partitions no of cores - Some jobs run in 3s others take about 6s -- the time difference can be explained by GC time. If number of partitions = no of cores - All jobs run in 4s. The initial tasks of each stage on every executor take about 1s. If partitions cores - Jobs take more time. The initial tasks of each stage on every executor take about 1s. The other tasks run in 45-50 ms each. However, since the initial tasks again take about 1s each, the total time in this case is about 6s which is more than the previous case. Clearly the limiting factor here is the initial set of tasks. For every case, these tasks take 1s to run, no matter the amount of partitions. Hence best results are obtained with partitions = cores, because in that case. every core gets 1 task which takes 1s to run. In this case, I get 0 GC time. The only explanation is scheduling delay which is about 0.2 - 0.3 seconds. I looked at my task size and result size and that has no bearing on this delay. Also, I'm not getting the task size warnings in the logs. For what I can understand, the first time a task runs on a core, it takes 1s to run. Is this normal? Is it possible to get sub-second latencies? Can something be done about the scheduler delay? What other things can I look at to reduce this time? Regards, Anshul
default number of reducers
In Normal MR job can I configure ( cluster wide) default number of reducers - if I don't specify any reducers in my job
Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...
The initializer is a tuple (0, 0) it seems you just have 0 From: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io Organization: PRISMALYTICS, LLC. Reply-To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io Date: Tuesday, April 28, 2015 at 1:28 PM To: Silvio Fiorito, Todd Nist Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ... Thank you Todd, Silvio... I had to stare at Silvio's answer for a while. If I'm interpreting the aggregateByKey() statement correctly ... (Within-Partition Reduction Step) a: is a TUPLE that holds: (runningSum, runningCount). b: is a SCALAR that holds the next Value (Cross-Partition Reduction Step) a: is a TUPLE that holds: (runningSum, runningCount). b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount). Under that interpretation, I tried to write run the Python equivalent, like so: rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])) Sadly, it didn't work, yielding the following exception which indicates that the indexing above is incorrect: lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) File input, line 1, in lambda TypeError: 'int' object has no attribute '__getitem__' Sidenote: Surprisingly, there isn't much documentation -- at least not for Python -- for this useful aggregateByKey() method and use case; although I will be sure to write a gist today, once I get this working. :) I think I'm nearly there though, so... (1) Is my written interpretation above about of what (a,b) are correct? (2) If yes, what then, is getting passed in the Python case? I guess I'm looking for the Python equivalent to the first statement in Silvio's answer (below). But my reasoning to deconstruct and reconstruct is missing something. Thanks again! On 04/28/2015 11:26 AM, Silvio Fiorito wrote: If you need to keep the keys, you can use aggregateByKey to calculate an avg of the values: val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 1), (a, b) = (a._1 + b._1, a._2 + b._2)) val avgByKey = step1.mapValues(i = i._1/i._2) Essentially, what this is doing is passing an initializer for sum and count, then summing each pair of values and counting the number of values. The last argument is to combine the results of each partition, if the data was spread across partitions. The result is a tuple of sum and count for each key. Use mapValues to keep your partitioning by keys intact and minimize a full shuffle for downstream keyed operations. It just calculates the avg for each key. From: Todd Nist Date: Tuesday, April 28, 2015 at 10:20 AM To: subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ... Can you simply apply the https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter to this? You should be able to do something like this: val stats = RDD.map(x = x._2).stats() -Todd On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io wrote: Hello Friends: I generated a Pair RDD with K/V pairs, like so: rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.246), (u'2013-10-13', 10.693069306930692)] Now from the above RDD, I would like to calculate an average of the VALUES for each KEY. I can do so as shown here, which does work: countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the SUM). rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] But I wonder if the above semantics/approach is the optimal one; or whether perhaps there is a single API call that handles common use case. Improvement thoughts welcome. =:) Thank you, nmv -- Sincerely yours, Team PRISMALYTICS PRISMALYTICS, LLC.http://www.prismalytics.com/| www.prismalytics.comhttp://www.prismalytics.com/ P: 212.882.1276tel:212.882.1276 | subscripti...@prismalytics.iomailto:subscripti...@prismalytics.io [Follow Us:
Code For Loading Graph from Edge Tuple File
Hi Everyone, Does anyone have example code for generating a graph from a file of edge name-edge name tuples? I've seen the example where a Graph is generated from an RDD of triplets composed of edge longs, but I'd like to see an example where a graph is built from a edge name-edge -name file such as main street,first street\\n US Interstate, first street\\n, etc... Thanks --John -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Code-For-Loading-Graph-from-Edge-Tuple-File-tp22693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...
Thank you Todd, Silvio... I had to stare at Silvio's answer for a while. _If I'm interpreting the aggregateByKey() statement__correctly ... _ (Within-Partition Reduction Step) a: is a TUPLE that holds: (runningSum, runningCount). b: is a SCALAR that holds the next Value (Cross-Partition Reduction Step) a: is a TUPLE that holds: (runningSum, runningCount). b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount). Under that interpretation, I tried to write run the Python equivalent, like so: rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])) Sadly, it didn't work, yielding the following exception which indicates that the indexing above is incorrect: lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) File input, line 1, in lambda TypeError: 'int' object has no attribute '__getitem__' /Sidenote: Surprisingly, there isn't much documentation -- at least not for Python -- for this useful aggregateByKey()// //method and use case; although I will be sure to write a //g//ist today, once I get this working. :)/ _ __I think I'm nearly there though, so..._ (1) Is my written interpretation above about of what (a,b) are correct? (2) If yes, what then, is getting passed in the Python case? I guess I'm looking for the Python equivalent to the first statement in Silvio's answer (below). But my reasoning to deconstruct and reconstruct is missing something. Thanks again! On 04/28/2015 11:26 AM, Silvio Fiorito wrote: If you need to keep the keys, you can use aggregateByKey to calculate an avg of the values: val step1 = data.aggregateByKey((0.0, 0))((a, b) = (a._1 + b, a._2 + 1), (a, b) = (a._1 + b._1, a._2 + b._2)) val avgByKey = step1.mapValues(i = i._1/i._2) Essentially, what this is doing is passing an initializer for sum and count, then summing each pair of values and counting the number of values. The last argument is to combine the results of each partition, if the data was spread across partitions. The result is a tuple of sum and count for each key. Use mapValues to keep your partitioning by keys intact and minimize a full shuffle for downstream keyed operations. It just calculates the avg for each key. From: Todd Nist Date: Tuesday, April 28, 2015 at 10:20 AM To: subscripti...@prismalytics.io mailto:subscripti...@prismalytics.io Cc: user@spark.apache.org mailto:user@spark.apache.org Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ... Can you simply apply the https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter to this? You should be able to do something like this: val stats = RDD.map(x = x._2).stats() -Todd On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io mailto:subscripti...@prismalytics.io subscripti...@prismalytics.io mailto:subscripti...@prismalytics.io wrote: Hello Friends: I generated a Pair RDD with K/V pairs, like so: rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.246), (u'2013-10-13', 10.693069306930692)] Now from the above RDD, I would like to calculate an average of the VALUES for each KEY. I can do so as shown here, which does work: * ***countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the SUM). rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ] But I wonder if the above semantics/approach is the optimal one; or whether perhaps there is a single API call that handles common use case. Improvement thoughts welcome. =:) Thank you, nmv -- PRISMALYTICS Sincerely yours, Team PRISMALYTICS PRISMALYTICS, LLC. http://www.prismalytics.com/ | www.prismalytics.com http://www.prismalytics.com/ P: 212.882.1276 tel:212.882.1276 | subscripti...@prismalytics.io mailto:subscripti...@prismalytics.io Follow Us: https://www.LinkedIn.com/company/prismalytics https://www.linkedin.com/company/prismalytics Prismalytics, LLC. http://www.prismalytics.com/ data analytics to literally count on
How to run customized Spark on EC2?
Hi experts, I have an issue. I added some timestamps in Spark source code and built it using: mvn package -DskipTests I checked the new version in my own computer and it works. However, when I ran spark on EC2, the spark code EC2 machines ran is the original version. Anyone knows how to deploy the changed spark source code into EC2? Thx a lot Bo Fu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to setup this false streaming problem
Hi, Just new to Spark and in need of some help for framing the problem I have. A problem well stated is half solved it's the saying :) Let's say that I have a DStream[String] basically containing Json of some measurements from IoT devices. In order to keep it simple say that after unmarshalling I have data like: case class Measurement(val deviceId:Long, val timestamp:Date, val measurement:Double) I need to use DStreams because there is some interest in monitoring real-time the measurements of devices. So let's say that I have a dashboard with hourly granularity, past hours are consolidated but current hour must be kept updated on every input. The problem is that the Time that matters is the timestamp in the Json not the receiving timestamp by Spark so I think that I have to keep a stateful DStream like the one described http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ . I have two questions here: 1. Once a given hour is gone, I'd like to flush the consolidated stream into a DB. I think the strategy is to have a Stream with key-values where the key is (userID, truncateByHour(timestamp)) and reducing over the values. But it seems to me that with this approach Spark has lost any sense of time, how would you flush all the RDDs with timestamps between 00:00:00 and 00:59:59 for instance? Maybe I'm missing some function in the API 2. How do you deal with events that come with timestamps in the past, is it a matter of ignoring them, finding a trade-off between memory and how long the stateful DStream is? But then, who is the one poping the mature time slices from the stateful stream. For me Spark Streaming would be the most natural way to face this problem, but maybe a simple Spark processing run every minute could keep easily the sorting by time of external events. I'd like to hear your thoughts. Toni
MLLib SVMWithSGD is failing for large dataset
I am trying to train a large dataset consisting of 8 million data points and 20 million features using SVMWithSGD. But it is failing after running for some time. I tried increasing num-partitions, driver-memory, executor-memory, driver-max-resultSize. Also I tried by reducing the size of dataset from 8 million to 25K (keeping number of features same 20 M). But after using the entire 64GB driver memory for 20 to 30 min it failed. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). executor-memory - 60G driver-memory - 60G num-executors - 64 And other default settings This is the error log : 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is closed 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused: xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at
Spark 1.3.1 JavaStreamingContext - fileStream compile error
Hi Forum I am facing below compile error when using the fileStream method of the JavaStreamingContext class. I have copied the code from JavaAPISuite.java test class of spark test code. The error message is The method fileStream(String, ClassK, ClassV, ClassF, FunctionPath,Boolean, boolean) in the type JavaStreamingContext is not applicable for the arguments (String, ClassLongWritable, ClassText, ClassTextInputFormat, new FunctionPath,Boolean(){}, boolean) Please help me to find a solution for this. http://apache-spark-user-list.1001560.n3.nabble.com/file/n22683/47.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22683.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: JavaDStream compute method NPE
Hi Puneith, Please provide the code if you may. It will be helpful. Thank you, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-JavaDStream-compute-method-NPE-tp22676p22684.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JAVA_HOME problem
I was able to solve this problem hard coding the JAVA_HOME inside org.apache.spark.deploy.yarn.Client.scala class. *val commands = prefixEnv ++ Seq(-- YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + /bin/java, -server++ /usr/java/jdk1.7.0_51/bin/java, -server)* Somehow {{JAVA_HOME}} was not getting resolved in the node of yarn container. This change has fixed the problem. Now I am getting a new error. *Container: container_1430123808466_36297_02_01 === LogType: stderr LogLength: 87 Log Contents: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher LogType: stdout LogLength: 0 Log Contents:* Looks like now classpath variables are not resolved in yarn node. I have mapreduce jobs running in the same cluster working without any problem. Any pointer why this could happen? Thanks Sourabh On Fri, Apr 24, 2015 at 3:52 PM, sourabh chaki chaki.sour...@gmail.com wrote: Yes Akhil. This is the same issue. I have updated my comment in that ticket. Thanks Sourabh On Fri, Apr 24, 2015 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Isn't this related to this https://issues.apache.org/jira/browse/SPARK-6681 Thanks Best Regards On Fri, Apr 24, 2015 at 11:40 AM, sourabh chaki chaki.sour...@gmail.com wrote: I am also facing the same problem with spark 1.3.0 and yarn-client and yarn-cluster mode. Launching yarn container failed and this is the error in stderr: Container: container_1429709079342_65869_01_01 === LogType: stderr LogLength: 61 Log Contents: /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory LogType: stdout LogLength: 0 Log Contents: I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh export JAVA_HOME=/usr/java/default export PATH=$PATH:$JAVA_HOME/bin/java grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh export JAVA_HOME=/usr/java/default I could see another thread for the same problem but I dont see any solution. http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0 Any pointer will be helpful. Thanks Sourabh On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 917361...@qq.com wrote: spark 1.3.0 spark@pc-zjqdyyn1:~ tail /etc/profile export JAVA_HOME=/usr/jdk64/jdk1.7.0_45 export PATH=$PATH:$JAVA_HOME/bin # # End of /etc/profile # But ERROR LOG Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454 LogType: stderr LogLength: 61 Log Contents: /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory LogType: stdout LogLength: 0 Log Contents:
Re: Spark 1.3.1 JavaStreamingContext - fileStream compile error
How about: JavaPairDStreamLongWritable, Text input = jssc.fileStream(inputDirectory, LongWritable.class, Text.class, TextInputFormat.class); See the complete example over here https://github.com/databricks/learning-spark/blob/98c4d57f3a611269823c53d9249ab4fbb3065a0c/src/main/java/com/oreilly/learningsparkexamples/java/logs/ReadTransferStats.java Thanks Best Regards On Tue, Apr 28, 2015 at 11:32 AM, lokeshkumar lok...@dataken.net wrote: Hi Forum I am facing below compile error when using the fileStream method of the JavaStreamingContext class. I have copied the code from JavaAPISuite.java test class of spark test code. The error message is The method fileStream(String, ClassK, ClassV, ClassF, FunctionPath,Boolean, boolean) in the type JavaStreamingContext is not applicable for the arguments (String, ClassLongWritable, ClassText, ClassTextInputFormat, new FunctionPath,Boolean(){}, boolean) Please help me to find a solution for this. http://apache-spark-user-list.1001560.n3.nabble.com/file/n22683/47.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22683.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to debug Spark on Yarn?
On 27 Apr 2015, at 07:51, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com wrote: Spark 1.3 1. View stderr/stdout from executor from Web UI: when the job is running i figured out the executor that am suppose to see, and those two links show 4 special characters on browser. 2. Tail on Yarn logs: /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less Threw me: Application has not completed. Logs are only available after an application completes Any other ideas that i can try ? There's some stuff on log streaming of running Apps on Hadoop 2.6+ which can stream logs of running apps to HDFS. I don't know if spark supports that (I haven't tested it) so won't give the details right now. You can go from the RM to the node managers running the containers, and view the logs that way. From some other notes of mine: One configuration to aid debugging is tell the nodemanagers to keep data for a short period after containers finish !-- 10 minutes after a failure to see what is left in the directory-- property nameyarn.nodemanager.delete.debug-delay-sec/name value600/value /property You can then retrieve logs by either the web UI, or by connecting to the server (usually by ssh) and retrieve the logs from the log directory We also recommend making sure that YARN kills processes !--time before the process gets a -9 -- property nameyarn.nodemanager.sleep-delay-before-sigkill.ms/name value3/value /property
Re: java.lang.UnsupportedOperationException: empty collection
I've tried running your code through spark-shell on both 1.3.0 (pre-built for Hadoop 2.4 and above) and a recently built snapshot of master. Both work fine. Running on OS X yosemite. What's your configuration? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-UnsupportedOperationException-empty-collection-tp22677p22686.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization error
On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Deepak Hi Deepak, The snippet you proveide is of scala but I am working on java.I am tryng the same thing in java but please can you specify in detail what are the parameters you mentioned in that such as 'arguements'. Thanks Madhvi
Re: Understanding Spark's caching
Option B would be fine, as in the SO itself the answer says, Since RDD transformations merely build DAG descriptions without execution, in Option A by the time you call unpersist, you still only have job descriptions and not a running execution. Also note, In Option A, you are not specifying any action anywhere. Thanks Best Regards On Tue, Apr 28, 2015 at 12:58 AM, Eran Medan eran.me...@gmail.com wrote: Hi Everyone! I'm trying to understand how Spark's cache work. Here is my naive understanding, please let me know if I'm missing something: val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved) Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them. Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when? *Will this work? (Option A)* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd1.unpersist() Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right? *Should I do it this way instead (Option B)?* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) rdd1.unpersist() *So the question is this:* Is Option A good enough? e.g. will rdd1 be still accessing the file only once? Or do I need to go with Option B? (see also http://stackoverflow.com/questions/29903675/understanding-sparks-caching) Thanks in advance
How to add jars to standalone pyspark program
Hi, I'm trying to figure out how to use a third party jar inside a python program which I'm running via PyCharm in order to debug it. I am normally able to run spark code in python such as this: spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) cars = sc.textFile('c:/cars.csv') print cars.count() sc.stop() The code I'm trying to run is below - it uses the databricks spark csv jar. I can get it working fine in pyspark shell using the packages argument, but I can't figure out how to get it to work via PyCharm. from pyspark.sql import SQLContext from pyspark import SparkConf, SparkContext spark_conf = SparkConf().setMaster('local').setAppName('test') sc = SparkContext(conf=spark_conf) sqlContext = SQLContext(sc) df = sqlContext.load(source=com.databricks.spark.csv, header=true, path = c:/cars.csv, delimiter='\t') df.select(year) The error message I'm getting is: py4j.protocol.Py4JJavaError: An error occurred while calling o20.load. : java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I presume I need to set the spark classpath somehow but I'm not sure of the right way to do it. Any advice/guidance would be appreciated. Thanks, Mark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-tp22685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org