Spark job for demoing Spark metrics monitoring?
Hi, I'll be showing our Spark monitoring http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ at the upcoming Spark Summit in NYC. I'd like to run some/any Spark job that really exercises Spark and makes it emit all its various metrics (so the metrics charts are full of data and not blank or flat and boring). Since we don't use Spark at Sematext yet, I was wondering if anyone could recommend some Spark app/job that's easy to run, just to get some Spark job to start emitting various Spark metrics? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Re: sparkcontext.objectFile return thousands of partitions
You have 8 files, not 8 partitions. It does not follow that they should be read as 8 partitions since they are presumably large and so you would be stuck using at most 8 tasks in parallel to process. The number of partitions is determined by Hadoop input splits and generally makes a partition per block of data. If you know that this number is too high you can request a number of partitions when you read it. Don't coalesce, just read the desired number from the start. On Jan 21, 2015 4:32 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Why sc.objectFile(…) return a Rdd with thousands of partitions? I save a rdd to file system using rdd.saveAsObjectFile(“file:///tmp/mydir”) Note that the rdd contains 7 millions object. I check the directory /tmp/mydir/, it contains 8 partitions part-0 part-2 part-4 part-6 _SUCCESS part-1 part-3 part-5 part-7 I then load the rdd back using val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8) I expect rdd2 to have 8 partitions. But from the master UI, I see that rdd2 has over 1000 partitions. This is very inefficient. How can I limit it to 8 partitions just like what is stored on the file system? Regards, *Ningjun Wang* Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541
Re: [SQL] Using HashPartitioner to distribute by column
Hello Michael, In Spark SQL, we have our internal concepts of Output Partitioning (representing the partitioning scheme of an operator's output) and Required Child Distribution (representing the requirement of input data distribution of an operator) for a physical operator. Let's say we have two operators, parent and child, and the parent takes the output of the child as its input. At the end of query planning process, whenever the Output Partitioning of the child does not satisfy the Required Child Distribution of the parent, we will add an Exchange operator between the parent and child to shuffle the data. Right now, we do not record the partitioning scheme of an input table. So, I think even if you use partitionBy (or DISTRIBUTE BY in SQL) to prepare your data, you still will see the Exchange operator and your GROUP BY operation will be executed in a new stage (after the Exchange). Making Spark SQL aware of the partitioning scheme of input tables is a useful optimization. I just created https://issues.apache.org/jira/browse/SPARK-5354 to track it. Thanks, Yin On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies michael.belldav...@gmail.com wrote: Hi Cheng, Are you saying that by setting up the lineage schemaRdd.keyBy(_.getString( 1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema) then Spark SQL will know that an SQL “group by” on Customer Code will not have to shuffle? But the prepared will have already shuffled so we pay an upfront cost for future groupings (assuming we cache I suppose) Mick On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com wrote: First of all, even if the underlying dataset is partitioned as expected, a shuffle can’t be avoided. Because Spark SQL knows nothing about the underlying data distribution. However, this does reduce network IO. You can prepare your data like this (say CustomerCode is a string field with ordinal 1): val schemaRdd = sql(...)val schema = schemaRdd.schemaval prepared = schemaRdd.keyBy(_.getString(1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema) n should be equal to spark.sql.shuffle.partitions. Cheng On 1/19/15 7:44 AM, Mick Davies wrote: Is it possible to use a HashPartioner or something similar to distribute a SchemaRDDs data by the hash of a particular column or set of columns. Having done this I would then hope that GROUP BY could avoid shuffle E.g. set up a HashPartioner on CustomerCode field so that SELECT CustomerCode, SUM(Cost) FROM Orders GROUP BY CustomerCode would not need to shuffle. Cheers Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job stuck at RangePartitioner at Exchange.scala:79
I was able to resolve this by adding rdd.collect() after every stage. This enforced RDD evaluation and helped avoid the choke point. regards Sunita Kopppar On Sat, Jan 17, 2015 at 12:56 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi, My spark jobs suddenly started getting hung and here is the debug leading to it: Following the program, it seems to be stuck whenever I do any collect(), count or rdd.saveAsParquet file. AFAIK, any operation that requires data flow back to master causes this. I increased the memory to 5 MB. Also, as per the debug statements, the memory is sufficient enough. Also increased -Xss and 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called with curMem=0, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 258.6 KB, free 972.3 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:85 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called with curMem=264808, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 205.4 KB, free 972.1 MB) 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called with curMem=475152, maxMem=1019782103 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 275.6 KB, free 971.8 MB) 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner at Exchange.scala:79 A bit of background which may or may not be relevant. The program was working fine in eclipse, however, was getting hung upon submission to the cluster. In an attempt to debug, I changed the version in build.sbt to match the one on the cluster sbt config when the program was working: org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.3.0 % provided During debugging, I changed this to: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-sql % 1.2.0 % provided, spark.jobserver % job-server-api % 0.4.0, com.github.nscala-time %% nscala-time % 1.6.0, org.apache.hadoop % hadoop-client % 2.5.0 % provided This is when the program started getting hung at the first rdd.count(). Now, even after reverting the changes in build.sbt, my program is getting hung at the same point. Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini to 5MB each and set the below vars programatically sparkConf.set(spark.akka.frameSize,10) sparkConf.set(spark.shuffle.spill,true) sparkConf.set(spark.driver.memory,512m) sparkConf.set(spark.executor.memory,1g) sparkConf.set(spark.driver.maxResultSize,1g) Please note. In eclipse as well as sbt the program kept throwing StackOverflow. Increasing Xss to 5 MB eliminated the problem, Could this be something unrelated to memory? The SchemaRDDs have close to 400 columns and hence I am using StructType(StructField) and performing applySchema. My code cannot be shared right now. If required, I will edit it and post. regards Sunita
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
numStreams is 5 in my case. ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Mukesh, How are you creating your receivers? Could you post the (relevant) code? -kr, Gerard. On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, I've re partitioned my kafkaStream so that it gets evenly distributed among the executors and the results are better. Still from the executors page it seems that only 1 executors all 8 cores are getting used and other executors are using just 1 core. Is this the correct interpretation based on the below data? If so how can we fix this? [image: Inline image 1] On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec
Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)
We have not meet this issue, so not sure there are bugs related to reused worker or not. Could provide more details about it? On Wed, Jan 21, 2015 at 2:27 AM, critikaled isasmani@gmail.com wrote: I'm also facing the same issue. is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to delete graph checkpoints?
This is a question about checkpointing on GraphX. We'd like to automate deleting checkpoint files of old graphs. The RDD class has a getCheckpointFile() function, which allows us to retrieve the checkpoint file of an old RDD and then delete it. However, I couldn't find a way to get hold of the corresponding checkpointed RDDs given the graph reference; because the checkpoint of a GraphImpl is really done to the underlying partitionsRDD in both VertexRDD and EdgeRDD, and that partitionsRDD as defined today doesn't seem to be accessible from outside of graphx. Below is the declaration in VertexRDD.scala: private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]] We would really appreciate it if anyone could shed some light on solving this problem, or anyone who has come across a similar problem could share a solution or workaround. Thank you, Cheuk Lam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-graph-checkpoints-tp21296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparkcontext.objectFile return thousands of partitions
maybe each of the file parts has many blocks? did you try SparkContext.coalesce to reduce the number of partitions? can be done w/ or w/o data-shuffle. *Noam Barcay* Developer // *Kenshoo* *Office* +972 3 746-6500 *427 // *Mobile* +972 54 475-3142 __ *www.Kenshoo.com* http://kenshoo.com/ On Wed, Jan 21, 2015 at 5:31 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Why sc.objectFile(…) return a Rdd with thousands of partitions? I save a rdd to file system using rdd.saveAsObjectFile(“file:///tmp/mydir”) Note that the rdd contains 7 millions object. I check the directory /tmp/mydir/, it contains 8 partitions part-0 part-2 part-4 part-6 _SUCCESS part-1 part-3 part-5 part-7 I then load the rdd back using val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8) I expect rdd2 to have 8 partitions. But from the master UI, I see that rdd2 has over 1000 partitions. This is very inefficient. How can I limit it to 8 partitions just like what is stored on the file system? Regards, *Ningjun Wang* Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -- This e-mail, as well as any attached document, may contain material which is confidential and privileged and may include trademark, copyright and other intellectual property rights that are proprietary to Kenshoo Ltd, its subsidiaries or affiliates (Kenshoo). This e-mail and its attachments may be read, copied and used only by the addressee for the purpose(s) for which it was disclosed herein. If you have received it in error, please destroy the message and any attachment, and contact us immediately. If you are not the intended recipient, be aware that any review, reliance, disclosure, copying, distribution or use of the contents of this message without Kenshoo's express permission is strictly prohibited.
Re: How to 'Pipe' Binary Data in Apache Spark
Hi Venkat/Nick, The Spark RDD.pipe method pipes text data into a subprocess and then receives text data back from that process. Once you have the binary data loaded into an RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data in the pipes to contain binary, not text), you need to implement your own, modified version of RDD.pipe. The implementation of RDD.pipe spawns a process per partition (IIRC), as well as threads for writing to and reading from the process (as well as stderr for the process). When writing via RDD.pipe, Spark calls *.toString on the object, and pushes that text representation down the pipe. There is an example of how to pipe binary data from within a mapPartitions call using the Scala API in lines 107-177 of this file. This specific code contains some nastiness around the packaging of downstream libraries that we rely on in that project, so I’m not sure if it is the cleanest way, but it is a workable way. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 21, 2015, at 9:17 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: I am trying to solve similar problem. I am using option # 2 as suggested by Nick. I have created an RDD with sc.binaryFiles for a list of .wav files. But, I am not able to pipe it to the external programs. For example: sq = sc.binaryFiles(wavfiles) ß All .wav files stored on “wavfiles” directory on HDFS sq.keys().collect() ß works fine. Shows the list of file names. sq.values().collect() ß works fine. Shows the content of the files. sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 'wav', '-', '-n', 'stats'])).collect() ß Does not work. Tried different options. AttributeError: 'function' object has no attribute 'read' Any suggestions? Regards, Venkat Ankam From: Nick Allen [mailto:n...@nickallen.org] Sent: Friday, January 16, 2015 11:46 AM To: user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it prevents the use of more than one mapper on the file's data. In my specific case for #1 I can only find one project from RIPE-NCC (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it appears to only support a limited set of network protocols. On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.org wrote: Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: spark 1.2 three times slower than spark 1.1, why?
Is it possible to re-run your job with spark.eventLog.enabled to true, and send the resulting logs to the list? Those have more per-task information that can help diagnose this. -Kay On Wed, Jan 21, 2015 at 1:57 AM, Fengyun RAO raofeng...@gmail.com wrote: btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40 MB 2015-01-21 17:53 GMT+08:00 Fengyun RAO raofeng...@gmail.com: I don't know how to debug distributed application, any tools or suggestion? but from spark web UI, the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and 1.2. there are no Shuffle Read and Spill. The only difference is Duration DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9 minspark 1.12 s17 s18 s18 s34 s 2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com: I mean that if you had tasks running on 10 machines now instead of 3 for some reason you would have more than 3 times the read load on your source of data all at once. Same if you made more executors per machine. But from your additional info it does not sound like this is the case. I think you need more debugging to pinpoint what is slower. On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote: thanks, Sean. I don't quite understand you have *more *partitions across *more * workers. It's within the same cluster, and the same data, thus I think the same partition, the same workers. we switched from spark 1.1 to 1.2, then it's 3x slower. (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found the problem. then we installed a standalone spark 1.1, stop the 1.2, run the same script, it's 3x faster. stop 1.1, start 1.2, 3x slower again) 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: [SQL] Using HashPartitioner to distribute by column
Michael - I mean although preparing and repartitioning the underlying data can't avoid the shuffle introduced by Spark SQL (Yin has explained why), but it does help to reduce network IO. On 1/21/15 10:01 AM, Yin Huai wrote: Hello Michael, In Spark SQL, we have our internal concepts of Output Partitioning (representing the partitioning scheme of an operator's output) and Required Child Distribution (representing the requirement of input data distribution of an operator) for a physical operator. Let's say we have two operators, parent and child, and the parent takes the output of the child as its input. At the end of query planning process, whenever the Output Partitioning of the child does not satisfy the Required Child Distribution of the parent, we will add an Exchange operator between the parent and child to shuffle the data. Right now, we do not record the partitioning scheme of an input table. So, I think even if you use partitionBy (or DISTRIBUTE BY in SQL) to prepare your data, you still will see the Exchange operator and your GROUP BY operation will be executed in a new stage (after the Exchange). Making Spark SQL aware of the partitioning scheme of input tables is a useful optimization. I just created https://issues.apache.org/jira/browse/SPARK-5354 to track it. Thanks, Yin On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies michael.belldav...@gmail.com mailto:michael.belldav...@gmail.com wrote: Hi Cheng, Are you saying that by setting up the lineage schemaRdd.keyBy(_.getString(1)).partitionBy(newHashPartitioner(n)).values.applySchema(schema) then Spark SQL will know that an SQL “group by” on Customer Code will not have to shuffle? But the prepared will have already shuffled so we pay an upfront cost for future groupings (assuming we cache I suppose) Mick On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: First of all, even if the underlying dataset is partitioned as expected, a shuffle can’t be avoided. Because Spark SQL knows nothing about the underlying data distribution. However, this does reduce network IO. You can prepare your data like this (say |CustomerCode| is a string field with ordinal 1): |val schemaRdd = sql(...) val schema = schemaRdd.schema val prepared = schemaRdd.keyBy(_.getString(1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema) | |n| should be equal to |spark.sql.shuffle.partitions|. Cheng On 1/19/15 7:44 AM, Mick Davies wrote: Is it possible to use a HashPartioner or something similar to distribute a SchemaRDDs data by the hash of a particular column or set of columns. Having done this I would then hope that GROUP BY could avoid shuffle E.g. set up a HashPartioner on CustomerCode field so that SELECT CustomerCode, SUM(Cost) FROM Orders GROUP BY CustomerCode would not need to shuffle. Cheers Mick -- View this message in context:http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html Sent from the Apache Spark User List mailing list archive atNabble.com http://Nabble.com. - To unsubscribe, e-mail:user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail:user-h...@spark.apache.org mailto:user-h...@spark.apache.org
RE: How to 'Pipe' Binary Data in Apache Spark
I am trying to solve similar problem. I am using option # 2 as suggested by Nick. I have created an RDD with sc.binaryFiles for a list of .wav files. But, I am not able to pipe it to the external programs. For example: sq = sc.binaryFiles(wavfiles) -- All .wav files stored on “wavfiles” directory on HDFS sq.keys().collect() -- works fine. Shows the list of file names. sq.values().collect() -- works fine. Shows the content of the files. sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 'wav', '-', '-n', 'stats'])).collect() -- Does not work. Tried different options. AttributeError: 'function' object has no attribute 'read' Any suggestions? Regards, Venkat Ankam From: Nick Allen [mailto:n...@nickallen.org] Sent: Friday, January 16, 2015 11:46 AM To: user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it prevents the use of more than one mapper on the file's data. In my specific case for #1 I can only find one project from RIPE-NCC (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it appears to only support a limited set of network protocols. On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.orgmailto:n...@nickallen.org wrote: Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.orgmailto:n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.orgmailto:n...@nickallen.org -- Nick Allen n...@nickallen.orgmailto:n...@nickallen.org -- Nick Allen n...@nickallen.orgmailto:n...@nickallen.org This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
RE: Spark 1.1.0 - spark-submit failed
Thanks for help. I added the following dependency in my pom file and the problem went away. dependency !-- default Netty -- groupIdio.netty/groupId artifactIdnetty/artifactId version3.6.6.Final/version /dependency Ey-Chih Date: Tue, 20 Jan 2015 16:57:20 -0800 Subject: Re: Spark 1.1.0 - spark-submit failedFrom: yuzhih...@gmail.com To: eyc...@hotmail.com CC: user@spark.apache.org Please check which netty jar(s) are on the classpath. NioWorkerPool(Executor workerExecutor, int workerCount) was added in netty 3.5.4 Cheers On Tue, Jan 20, 2015 at 4:15 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I issued the following command in a ec2 cluster launched using spark-ec2: ~/spark/bin/spark-submit --class com.crowdstar.cluster.etl.ParseAndClean --master spark://ec2-54-185-107-113.us-west-2.compute.amazonaws.com:7077 --deploy-mode cluster --total-executor-cores 4 file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar /ETL/input/2015/01/10/12/10Jan2015.avro file:///tmp/etl-admin/vertica/VERTICA.avdl file:///tmp/etl-admin/vertica/extras.json file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar The command failed with the following error logs in Spark-UI. Is there any suggestion on how to fix the problem? Thanks. Ey-Chih Chow == Launch Command: /usr/lib/jvm/java-1.7.0/bin/java -cp /root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize=128m -Dspark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/ -Dspark.executor.memory=13000m -Dspark.akka.askTimeout=10 -Dspark.cores.max=4 -Dspark.app.name=com.crowdstar.cluster.etl.ParseAndClean -Dspark.jars=file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar -Dspark.executor.extraClassPath=/root/ephemeral-hdfs/conf -Dspark.master=spark://ec2-54-203-58-2.us-west-2.compute.amazonaws.com:7077 -Dakka.loglevel=WARNING -Xms512M -Xmx512M org.apache.spark.deploy.worker.DriverWrapper akka.tcp://sparkwor...@ip-10-33-140-157.us-west-2.compute.internal:47585/user/Worker com.crowdstar.cluster.etl.ParseAndClean /ETL/input/2015/01/10/12/10Jan2015.avro file:///tmp/etl-admin/vertica/VERTICA.avdl file:///tmp/etl-admin/vertica/extras.json file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/01/20 20:08:45 INFO spark.SecurityManager: Changing view acls to: root, 15/01/20 20:08:45 INFO spark.SecurityManager: Changing modify acls to: root, 15/01/20 20:08:45 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 15/01/20 20:08:45 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/20 20:08:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem [Driver] java.lang.NoSuchMethodError: org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282) at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at
Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Oh yes, thanks for adding that using sc.hadoopConfiguration.set also works :-) On Wed, Jan 21, 2015 at 7:11 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Thanks for looking Cheng. Just to clarify in case other people need this sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata, false)did work well in terms of dropping rowgroups/showing small input size. What was odd about that is that the overall time wasn't much better...but maybe that was overhead from sending the metadata clientside. Thanks again and looking forward to your fix On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Yana, Sorry for the late reply, missed this important thread somehow. And many thanks for reporting this. It turned out to be a bug — filter pushdown is only enabled when using client side metadata, which is not expected, because task side metadata code path is more performant. And I guess that the reason why setting parquet.task.side.metadata to false didn’t reduce input size for you is because you set the configuration with Spark API, or put it into spark-defaults.conf. This configuration goes to Hadoop Configuration, and Spark only merge properties whose names start with spark.hadoop into Hadoop Configuration instances. You may try to put parquet.task.side.metadata config into Hadoop core-site.xml, and then re-run the query. I can see significant differences by doing so. I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for reporting all the details! Cheng On 1/13/15 12:56 PM, Yana Kadiyska wrote: Attempting to bump this up in case someone can help out after all. I spent a few good hours stepping through the code today, so I'll summarize my observations both in hope I get some help and to help others that might be looking into this: 1. I am setting *spark.sql.parquet.**filterPushdown=true* 2. I can see by stepping through the driver debugger that PaquetTableOperations.execute sets the filters via ParquetInputFormat.setFilterPredicate (I checked the conf object, things appear OK there) 3. In FilteringParquetRowInputFormat, I get through the codepath for getTaskSideSplits. It seems that the codepath for getClientSideSplits would try to drop rowGroups but I don't see similar in getTaskSideSplit. Does anyone have pointers on where to look after this? Where is rowgroup filtering happening in the case of getTaskSideSplits? I can attach to the executor but am not quite sure what code related to Parquet gets called executor side...also don't see any messages in the executor logs related to Filtering predicates. For comparison, I went through the getClientSideSplits and can see that predicate pushdown works OK: sc.hadoopConfiguration.set(parquet.task.side.metadata,false) 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split Strategy 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800) 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do not pass filter predicate (28 %) ! Is it possible that this is just a UI bug? I can see Input=4G when using (parquet.task.side.metadata,false) and Input=140G when using (parquet.task.side.metadata,true) but the runtimes are very comparable? [image: Inline image 1] JobId 4 is the ClientSide split, JobId 5 is the TaskSide split. On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
Re: [SQL] Using HashPartitioner to distribute by column
Hi Cheng, Are you saying that by setting up the lineage schemaRdd.keyBy(_.getString(1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema) then Spark SQL will know that an SQL “group by” on Customer Code will not have to shuffle? But the prepared will have already shuffled so we pay an upfront cost for future groupings (assuming we cache I suppose) Mick On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com wrote: First of all, even if the underlying dataset is partitioned as expected, a shuffle can’t be avoided. Because Spark SQL knows nothing about the underlying data distribution. However, this does reduce network IO. You can prepare your data like this (say CustomerCode is a string field with ordinal 1): val schemaRdd = sql(...) val schema = schemaRdd.schema val prepared = schemaRdd.keyBy(_.getString(1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema) n should be equal to spark.sql.shuffle.partitions. Cheng On 1/19/15 7:44 AM, Mick Davies wrote: Is it possible to use a HashPartioner or something similar to distribute a SchemaRDDs data by the hash of a particular column or set of columns. Having done this I would then hope that GROUP BY could avoid shuffle E.g. set up a HashPartioner on CustomerCode field so that SELECT CustomerCode, SUM(Cost) FROM Orders GROUP BY CustomerCode would not need to shuffle. Cheers Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Discourse: A proposed alternative to the Spark User list
Josh / Patrick, What do y’all think of the idea of promoting Stack Overflow as a place to ask questions over this list, as long as the questions fit SO’s guidelines ( how-to-ask http://stackoverflow.com/help/how-to-ask, dont-ask http://stackoverflow.com/help/dont-ask)? The apache-spark http://stackoverflow.com/questions/tagged/apache-spark tag is very active on there. Discussions of all types are still on-topic here, but when possible we want to encourage people to use SO. Nick On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com http://mailto:jayunit100.apa...@gmail.com wrote: Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User- list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[mllib] Decision Tree - prediction probabilites of label classes
Hi, I use DecisionTree for multi class classification. I can get the probability of the predicted label for every node in the decision tree from node.predict().prob(). Is it possible to retrieve or count the probability of every possible label class in the node? To be more clear: Say in Node A there are 4 of label 0.0, 2 of label 1.0 and 3 of label 2.0. If I'm correct predict.prob() is 4/9 in this case. I need the values 2/9 and 3/9 for the 2 other labels. It would be great to retrieve the exact count of label classes ([4,2,3] in the example) but I don't think thats possible now. Is something like this planned for a future release? Thanks!
Re: spark 1.2 three times slower than spark 1.1, why?
On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, You could create a empty LogParser object (it's serializable), then load the data in executor lazily. Could you add some logging to LogParser to check the behavior between Spark 1.1 and 1.2 (the number of times to load data)? what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Connect to Hive metastore (on YARN) from Spark Shell?
Is this possible, and if so what steps do I need to take to make this happen? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Connect to Hive metastore (on YARN) from Spark Shell?
You can put hive-site.xml in your conf/ directory. It will connect to Hive when HiveContext is initialized. Thanks. Zhan Zhang On Jan 21, 2015, at 12:35 PM, YaoPau jonrgr...@gmail.com wrote: Is this possible, and if so what steps do I need to take to make this happen? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Discourse: A proposed alternative to the Spark User list
Hi, I tried to find the last reply by Nick Chammas (that I received in the digest) using the Nabble web interface, but I cannot find it (perhaps he didn't reply directly to the user list?). That's one example of Nabble's usability. Anyhow, I wanted to add my two cents... Apache user group could be frozen (not accepting new questions, if that's possible) and redirect users to Stack Overflow (automatic reply?). Old questions remain (and are searchable) on Nabble, new questions go to Stack Exchange, so no need for migration. That's the idea, at least, as I'm not sure if that's technically doable... Is it? dev mailing list could perhaps stay on Nabble (it's not that busy), or have a special tag on Stack Exchange. Other thing, about new Stack Exchange site I proposed earlier. If a new site is created, there is no problem with guidelines, I think, because Spark community can apply different guidelines for the new site. There is a FAQ about creating new sites: http://area51.stackexchange.com/faq It says: Stack Exchange sites are free to create and free to use. All we ask is that you have an enthusiastic, committed group of expert users who check in regularly, asking and answering questions. I think this requirement is satisfied... Someone expressed a concern that they won't allow creating a project-specific site, but there already exist some project-specific sites, like Tor, Drupal, Ubuntu... Later, though, the FAQ also says: If Y already exists, it already has a tag for X, and nobody is complaining (then you should not create a new site). But we could complain :) The advantage of having a separate site is that users, who should have more privileges, would need to earn them through Spark questions and answers only. The other thing, already mentioned, is that the community could create Spark specific guidelines. There are also 'meta' sites for asking questions like this one, etc. There is a process for starting a site - it's not instantaneous. New site needs to go through private beta and public beta, so that could be a drawback. Like btiernay, I must say: there might be something about Apache projects and mailing lists that I do not know, so excuse me if that is the case... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21299.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is Apache Spark less accurate than Scikit Learn?
I've recently been trying to get to know Apache Spark as a replacement for Scikit Learn, however it seems to me that even in simple cases, Scikit converges to an accurate model far faster than Spark does. For example I generated 1000 data points for a very simple linear function (z=x+y) with the following script: http://pastebin.com/ceRkh3nb I then ran the following Scikit script: http://pastebin.com/1aECPfvq And then this Spark script: (with spark-submit filename, no other arguments) http://pastebin.com/s281cuTL Strangely though, the error given by spark is an order of magnitude larger than that given by Scikit (0.185 and 0.045 respectively) despite the two models having a nearly identical setup (as far as I can tell) I understand that this is using SGD with very few iterations and so the results may differ but I wouldn't have thought that it would be anywhere near such a large difference or such a large error, especially given the exceptionally simple data. Is there something I'm misunderstanding in Spark? Is it not correctly configured? Surely I should be getting a smaller error than that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Apache Spark less accurate than Scikit Learn?
I don’t get those results. I get: spark 0.14 scikit-learn0.85 The scikit-learn mse is due to the very low eta0 setting. Tweak that to 0.1 and push iterations to 400 and you get a mse ~= 0. Of course the coefficients are both ~1 and the intercept ~0. Similarly if you change the mllib step size to 0.5 and number of iterations to 1200 you again get a very low mse. One of the issues with SGD is you have to tweak these parameters to tune the algorithm. FWIW I wouldn’t see Spark MLlib as a replacement for scikit-learn. MLLib is nowhere as mature as scikit learn. However if you have large datasets that won’t sensibly fit the scikit-learn in-core model MLLib is one of the top choices. Similarly if you are running proof of concepts that you are eventually going to scale up to production environments then there is a definite argument for using MLlib at both the PoC and production stages. On 21 Jan 2015, at 20:39, JacquesH jaaksem...@gmail.com wrote: I've recently been trying to get to know Apache Spark as a replacement for Scikit Learn, however it seems to me that even in simple cases, Scikit converges to an accurate model far faster than Spark does. For example I generated 1000 data points for a very simple linear function (z=x+y) with the following script: http://pastebin.com/ceRkh3nb I then ran the following Scikit script: http://pastebin.com/1aECPfvq And then this Spark script: (with spark-submit filename, no other arguments) http://pastebin.com/s281cuTL Strangely though, the error given by spark is an order of magnitude larger than that given by Scikit (0.185 and 0.045 respectively) despite the two models having a nearly identical setup (as far as I can tell) I understand that this is using SGD with very few iterations and so the results may differ but I wouldn't have thought that it would be anywhere near such a large difference or such a large error, especially given the exceptionally simple data. Is there something I'm misunderstanding in Spark? Is it not correctly configured? Surely I should be getting a smaller error than that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use more executors
…not sure when will it be reviewed… but for now you can work around by allowing multiple worker instances on a single machine http://spark.apache.org/docs/latest/spark-standalone.html search SPARK_WORKER_INSTANCES Best, -- Nan Zhu http://codingcat.me On Wednesday, January 21, 2015 at 6:50 PM, Larry Liu wrote: Will SPARK-1706 be included in next release? On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com (mailto:yuzhih...@gmail.com) wrote: Please see SPARK-1706 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com (mailto:larryli...@gmail.com) wrote: I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thanks larry
Announcing SF / East Bay Area Stream Processing Meetup
Hi All I have been running Bay Area Storm meetup for almost 2 years. Instead of having meetups for storm and spark, I changed the storm meetup to be stream processing meetup where we can discuss about all stream processing frameworks. http://www.meetup.com/Bay-Area-Stream-Processing/events/218816482/?action=detaileventId=218816482 We meet every month in East Bay (Emeryville, CA). I am looking for someone to give a talk about Spark for the next meetup (Feb 5th) Let me know if you are interested in giving a talk. Thanks, -- Siva Jagadeesan
Re: Exception in connection from worker to worker
I have temporary fix for my case. My sample file was 2G / 50M lines in size. My initial configuration was 1000 splits. Based on my understanding of distributed algorithms, number of splits can affect the memory pressure in operations such as distinct and reduceByKey. So i tried to reduce the number of splits from 1000 to 100. Now I can run distinct and reduceByKey on files that are 2G / 50M lines. Unfortunately it still doesn't scale well. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-connection-from-worker-to-worker-tp20983p21302.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use more executors
Will SPARK-1706 be included in next release? On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com wrote: Please see SPARK-1706 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote: I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thanks larry
Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)
I set spark.python.worker.reuse = false and now it seems to run longer than before (it has not crashed yet). However, it is very very slow. How to proceed? On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote: Could you try to disable the new feature of reused worker by: spark.python.worker.reuse = false On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu wrote: Hi, It's a bit of a longer script that runs some deep learning training. Therefore it is a bit hard to wrap up easily. Essentially I am having a loop, in which a gradient is computed on each node and collected (this is where it freezes at some point). grads = zipped_trainData.map(distributed_gradient_computation).collect() The distributed_gradient_computation mainly contains a Theano derived function. The theano function itself is a broadcast variable. Let me know if you need more information. Best, Tassilo On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com wrote: Could you provide a short script to reproduce this issue? On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote: Hi, I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster than Spark 1.1. However, the initial joy faded quickly when I noticed that all my stuff didn't successfully terminate operations anymore. Using Spark 1.1 it still works perfectly fine, though. Specifically, the execution just freezes without any error output at one point, when calling a joint map() and collect() statement (after having it called many times successfully before in a loop). Any clue? Or do I have to wait for the next version? Best, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information in this e-mail is intended only for the person to whom it is addressed. If you believe this e-mail was sent to you in error and the e-mail contains patient information, please contact the Partners Compliance HelpLine at http://www.partners.org/complianceline . If the e-mail was sent to you in error but does not contain patient information, please contact the sender and properly dispose of the e-mail.
reading a csv dynamically
Hi all, im currently reading a csv file shich has the following format: (String, Double, Double,Double, Double, Double) and can map this no problems using: val dataRDD = sc.textFile(file.csv). map(_.split (,)). map(a= (Array(a(0)), Array(a(1).toDouble, a(2).toDouble), a(3), Array(a(4).toDouble, a(5).toDouble))) What i would like to do is because the input file may have a different number of fields ie it might have an extra double which needs to go in the first array of doubles ie: (String, Double, Double,Double, Double,Double, Double) which would see my map as val dataRDD = sc.textFile(file.csv). map(_.split (,)). map(a= (Array(a(0)), Array(a(1).toDouble, a(2).toDouble, a(3).toDouble), a(4), Array(a(5).toDouble, a(6).toDouble))) Is there a way i can make this map more dynamic ie if i create vals: val Array_1 = 3 val Array_2 = 2 Then use these to pick up the values for array 1 which we know should contain 3 values and say okay give me a(1) through to a(3) thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use more executors
Please see SPARK-1706 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote: I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thanks larry
Announcing SF / East Bay Area Stream Processing Meetup
Hi All I have been running Bay Area Storm meetup for almost 2 years. Instead of having meetups for storm and spark, I changed the storm meetup to be stream processing meetup where we can discuss about all stream processing frameworks. http://www.meetup.com/Bay-Area-Stream-Processing/events/218816482/?action=detaileventId=218816482 We meet every month in East Bay (Emeryville, CA). I am looking for someone to give a talk about Spark for the next meetup (Feb 5th) Let me know if you are interested in giving a talk. Thanks, -- Siva Jagadeesan
spark 1.1.0 save data to hdfs failed
Hi, I used the following fragment of a scala program to save data to hdfs: contextAwareEvents .map(e = (new AvroKey(e), null)) .saveAsNewAPIHadoopFile(hdfs:// + masterHostname + :9000/ETL/output/ + dateDir, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], job.getConfiguration) But it failed with the following error messages. Is there any people who can help? Thanks. Ey-Chih Chow = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: ip-10-33-140-157/10.33.140.157; destination host is: ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950) === -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1.0 save data to hdfs failed
What hdfs release are you using ? Can you check namenode log around time of error below to see if there is some clue ? Cheers On Wed, Jan 21, 2015 at 4:51 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the following fragment of a scala program to save data to hdfs: contextAwareEvents .map(e = (new AvroKey(e), null)) .saveAsNewAPIHadoopFile(hdfs:// + masterHostname + :9000/ETL/output/ + dateDir, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], job.getConfiguration) But it failed with the following error messages. Is there any people who can help? Thanks. Ey-Chih Chow = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: ip-10-33-140-157/10.33.140.157; destination host is: ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950) === -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use more executors
I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thanks larry
Re: Discourse: A proposed alternative to the Spark User list
I think a few things need to be laid out clearly: 1. This mailing list is the “official” user discussion platform. That is, it is sponsored and managed by the ASF. 2. Users are free to organize independent discussion platforms focusing on Spark, and there is already one such platform in Stack Overflow under the apache-spark and related tags. Stack Overflow works quite well. 3. The ASF will not agree to deprecating or migrating this user list to a platform that they do not control. 4. This mailing list has grown to an unwieldy size and discussions are hard to find or follow; discussion tooling is also lacking. We want to improve the utility and user experience of this mailing list. 5. We don’t want to fragment this “official” discussion community. 6. Nabble is an independent product not affiliated with the ASF. It offers a slightly better interface to the Apache mailing list archives. So to respond to some of your points, pzecevic: Apache user group could be frozen (not accepting new questions, if that’s possible) and redirect users to Stack Overflow (automatic reply?). From what I understand of the ASF’s policies, this is not possible. :( This mailing list must remain the official Spark user discussion platform. Other thing, about new Stack Exchange site I proposed earlier. If a new site is created, there is no problem with guidelines, I think, because Spark community can apply different guidelines for the new site. I think Stack Overflow and the various Spark tags are working fine. I don’t see a compelling need for a Stack Exchange dedicated to Spark, either now or in the near future. Also, I doubt a Spark-specific site can pass the 4 tests in the Area 51 FAQ http://area51.stackexchange.com/faq: - Almost all Spark questions are on-topic for Stack Overflow - Stack Overflow already exists, it already has a tag for Spark, and nobody is complaining - You’re not creating such a big group that you don’t have enough experts to answer all possible questions - There’s a high probability that users of Stack Overflow would enjoy seeing the occasional question about Spark I think complaining won’t be sufficient. :) Someone expressed a concern that they won’t allow creating a project-specific site, but there already exist some project-specific sites, like Tor, Drupal, Ubuntu… The communities for these projects are many, many times larger than the Spark community is or likely ever will be, simply due to the nature of the problems they are solving. What we need is an improvement to this mailing list. We need better tooling than Nabble to sit on top of the Apache archives, and we also need some way to control the volume and quality of mail on the list so that it remains a useful resource for the majority of users. Nick On Wed Jan 21 2015 at 3:13:21 PM pzecevic petar.zece...@gmail.com wrote: Hi, I tried to find the last reply by Nick Chammas (that I received in the digest) using the Nabble web interface, but I cannot find it (perhaps he didn't reply directly to the user list?). That's one example of Nabble's usability. Anyhow, I wanted to add my two cents... Apache user group could be frozen (not accepting new questions, if that's possible) and redirect users to Stack Overflow (automatic reply?). Old questions remain (and are searchable) on Nabble, new questions go to Stack Exchange, so no need for migration. That's the idea, at least, as I'm not sure if that's technically doable... Is it? dev mailing list could perhaps stay on Nabble (it's not that busy), or have a special tag on Stack Exchange. Other thing, about new Stack Exchange site I proposed earlier. If a new site is created, there is no problem with guidelines, I think, because Spark community can apply different guidelines for the new site. There is a FAQ about creating new sites: http://area51.stackexchange. com/faq It says: Stack Exchange sites are free to create and free to use. All we ask is that you have an enthusiastic, committed group of expert users who check in regularly, asking and answering questions. I think this requirement is satisfied... Someone expressed a concern that they won't allow creating a project-specific site, but there already exist some project-specific sites, like Tor, Drupal, Ubuntu... Later, though, the FAQ also says: If Y already exists, it already has a tag for X, and nobody is complaining (then you should not create a new site). But we could complain :) The advantage of having a separate site is that users, who should have more privileges, would need to earn them through Spark questions and answers only. The other thing, already mentioned, is that the community could create Spark specific guidelines. There are also 'meta' sites for asking questions like this one, etc. There is a process for starting a site - it's not instantaneous. New site needs to go through private beta and public beta, so that could be a
Re: Saving a mllib model in Spark SQL
Hey, Thanks Xiangrui Meng and Cheng Lian for your valuable suggestions. It works! Divyansh Jain. On Tue, January 20, 2015 2:49 pm, Xiangrui Meng wrote: You can save the cluster centers as a SchemaRDD of two columns (id: Int, center: Array[Double]). When you load it back, you can construct the k-means model from its cluster centers. -Xiangrui On Tue, Jan 20, 2015 at 11:55 AM, Cheng Lian lian.cs@gmail.com wrote: This is because KMeanModel is neither a built-in type nor a user defined type recognized by Spark SQL. I think you can write your own UDT version of KMeansModel in this case. You may refer to o.a.s.mllib.linalg.Vector and o.a.s.mllib.linalg.VectorUDT as an example. Cheng On 1/20/15 5:34 AM, Divyansh Jain wrote: Hey people, I have run into some issues regarding saving the k-means mllib model in Spark SQL by converting to a schema RDD. This is what I am doing: case class Model(id: String, model: org.apache.spark.mllib.clustering.KMeansModel) ââââimport sqlContext.createSchemaRDD ââââval rowRdd = sc.makeRDD(Seq(id, model)).map(p = Model(id, model)) This is the error that I get : scala.MatchError: org.apache.spark.mllib.classification.ClassificationModel (of class scala.reflect.internal.Types$TypeRef$anon$6) ââat org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio n.scala:53) ââat org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply (ScalaReflection.scala:64) ââat org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply (ScalaReflection.scala:62) ââat scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.sc ala:244) ââat scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.sc ala:244) ââat scala.collection.immutable.List.foreach(List.scala:318) ââat scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ââat scala.collection.AbstractTraversable.map(Traversable.scala:105) ââat org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio n.scala:62) ââat org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio n.scala:50) ââat org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaRefle ction.scala:44) ââat org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperato rs.scala:229) ââat org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94) Any help would be appreciated. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model -in-Spark-SQL-tp21264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Apache Spark less accurate than Scikit Learn?
Ah I see, thanks! I was just confused because given the same configuration, I would have thought that Spark and Scikit would give more similar results, but I guess this is simply not the case (as in your example, in order to get spark to give an mse sufficiently close to scikit's you have to give it a significantly larger step and iteration count). Would that then be a result of MLLib and Scikit differing slightly in their exact implementation of the optimizer? Or rather a case of (as you say) Scikit being a far more mature system (and therefore that MLLib would 'get better' over time)? Surely it is far from ideal that to get the same results you need more iterations (IE more computation), or do you think that that is simply coincidence and that given a different model/dataset it may be the other way around? I ask because I encountered this situation on other, larger datasets, so this is not an isolated case (though being the simplest example I could think of I would imagine that it's somewhat indicative of general behaviour) On Thu, Jan 22, 2015 at 1:57 AM, Robin East robin.e...@xense.co.uk wrote: I don’t get those results. I get: spark 0.14 scikit-learn0.85 The scikit-learn mse is due to the very low eta0 setting. Tweak that to 0.1 and push iterations to 400 and you get a mse ~= 0. Of course the coefficients are both ~1 and the intercept ~0. Similarly if you change the mllib step size to 0.5 and number of iterations to 1200 you again get a very low mse. One of the issues with SGD is you have to tweak these parameters to tune the algorithm. FWIW I wouldn’t see Spark MLlib as a replacement for scikit-learn. MLLib is nowhere as mature as scikit learn. However if you have large datasets that won’t sensibly fit the scikit-learn in-core model MLLib is one of the top choices. Similarly if you are running proof of concepts that you are eventually going to scale up to production environments then there is a definite argument for using MLlib at both the PoC and production stages. On 21 Jan 2015, at 20:39, JacquesH jaaksem...@gmail.com wrote: I've recently been trying to get to know Apache Spark as a replacement for Scikit Learn, however it seems to me that even in simple cases, Scikit converges to an accurate model far faster than Spark does. For example I generated 1000 data points for a very simple linear function (z=x+y) with the following script: http://pastebin.com/ceRkh3nb I then ran the following Scikit script: http://pastebin.com/1aECPfvq And then this Spark script: (with spark-submit filename, no other arguments) http://pastebin.com/s281cuTL Strangely though, the error given by spark is an order of magnitude larger than that given by Scikit (0.185 and 0.045 respectively) despite the two models having a nearly identical setup (as far as I can tell) I understand that this is using SGD with very few iterations and so the results may differ but I wouldn't have thought that it would be anywhere near such a large difference or such a large error, especially given the exceptionally simple data. Is there something I'm misunderstanding in Spark? Is it not correctly configured? Surely I should be getting a smaller error than that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there a way to delete hdfs file/directory using spark API?
Hi, allI wonder how to delete hdfs file/directory using spark API?
Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)
Because that you have large broadcast, they need to be loaded into Python worker for each tasks, if the worker is not reused. We will really appreciate that if you could provide a short script to reproduce the freeze, then we can investigate the root cause and fix it. Also, fire a JIRA for it, thanks! On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote: I set spark.python.worker.reuse = false and now it seems to run longer than before (it has not crashed yet). However, it is very very slow. How to proceed? On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote: Could you try to disable the new feature of reused worker by: spark.python.worker.reuse = false On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu wrote: Hi, It's a bit of a longer script that runs some deep learning training. Therefore it is a bit hard to wrap up easily. Essentially I am having a loop, in which a gradient is computed on each node and collected (this is where it freezes at some point). grads = zipped_trainData.map(distributed_gradient_computation).collect() The distributed_gradient_computation mainly contains a Theano derived function. The theano function itself is a broadcast variable. Let me know if you need more information. Best, Tassilo On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com wrote: Could you provide a short script to reproduce this issue? On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote: Hi, I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster than Spark 1.1. However, the initial joy faded quickly when I noticed that all my stuff didn't successfully terminate operations anymore. Using Spark 1.1 it still works perfectly fine, though. Specifically, the execution just freezes without any error output at one point, when calling a joint map() and collect() statement (after having it called many times successfully before in a loop). Any clue? Or do I have to wait for the next version? Best, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information in this e-mail is intended only for the person to whom it is addressed. If you believe this e-mail was sent to you in error and the e-mail contains patient information, please contact the Partners Compliance HelpLine at http://www.partners.org/complianceline . If the e-mail was sent to you in error but does not contain patient information, please contact the sender and properly dispose of the e-mail. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)
What do you suggest? Should I send you the script so you can run it yourself? Yes, my broadcast variables are fairly large (1.7 MBytes). On Wed, Jan 21, 2015 at 8:20 PM, Davies Liu dav...@databricks.com wrote: Because that you have large broadcast, they need to be loaded into Python worker for each tasks, if the worker is not reused. We will really appreciate that if you could provide a short script to reproduce the freeze, then we can investigate the root cause and fix it. Also, fire a JIRA for it, thanks! On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote: I set spark.python.worker.reuse = false and now it seems to run longer than before (it has not crashed yet). However, it is very very slow. How to proceed? On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote: Could you try to disable the new feature of reused worker by: spark.python.worker.reuse = false On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu wrote: Hi, It's a bit of a longer script that runs some deep learning training. Therefore it is a bit hard to wrap up easily. Essentially I am having a loop, in which a gradient is computed on each node and collected (this is where it freezes at some point). grads = zipped_trainData.map(distributed_gradient_computation).collect() The distributed_gradient_computation mainly contains a Theano derived function. The theano function itself is a broadcast variable. Let me know if you need more information. Best, Tassilo On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com wrote: Could you provide a short script to reproduce this issue? On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote: Hi, I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster than Spark 1.1. However, the initial joy faded quickly when I noticed that all my stuff didn't successfully terminate operations anymore. Using Spark 1.1 it still works perfectly fine, though. Specifically, the execution just freezes without any error output at one point, when calling a joint map() and collect() statement (after having it called many times successfully before in a loop). Any clue? Or do I have to wait for the next version? Best, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information in this e-mail is intended only for the person to whom it is addressed. If you believe this e-mail was sent to you in error and the e-mail contains patient information, please contact the Partners Compliance HelpLine at http://www.partners.org/complianceline . If the e-mail was sent to you in error but does not contain patient information, please contact the sender and properly dispose of the e-mail.
Re: reading a csv dynamically
Yes I think you need to create one map first which will keep the number of values in every line. Now you can group all the records with same number of values. Now you know how many types of arrays you will have. val dataRDD = sc.textFile(file.csv) val dataLengthRDD = dataRDD .map(line=(_.split(,).length,line)) val groupedData = dataLengthRDD.groupByKey() now you can process the groupedData as it will have arrays of length x in one RDD. groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, IterableV) pairs. I hope this helps Regards Pankaj Infoshore Software India -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.2 – How to change Default (Random) port ….
Hello, Recently, I have upgraded my setup to Spark 1.2 from Spark 1.1. I have 4 node Ubuntu Spark Cluster. With Spark 1.1, I used to write Spark Scala program in Eclipse on my Windows development host and submit the job on Ubuntu Cluster, from Eclipse (Windows machine). As on my network not all ports between Spark cluster and development machine are open, I set spark process ports to valid ports. On Spark 1.1 this works perfectly. When I try to run the same program with same user defined ports on Spark 1.2 cluster it gives me connection time out for port *56117*. I referred the Spark 1.2 configuration page (http://spark.apache.org/docs/1.2.0/configuration.html) but there are no new ports mentioned. *Here is my code for reference:* val conf = new SparkConf() .setMaster(sparkMaster) .setAppName(Spark SVD) .setSparkHome(/usr/local/spark) .setJars(jars) .set(spark.driver.host, consb2a) //Windows host (Development machine) .set(spark.driver.port, 51810) .set(spark.fileserver.port, 51811) .set(spark.broadcast.port, 51812) .set(spark.replClassServer.port, 51813) .set(spark.blockManager.port, 51814) .set(spark.executor.port, 51815) .set(spark.executor.memory, 2g) .set(spark.driver.memory, 4g) val sc = new SparkContext(conf) *Here is Exception:* 15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager wynchcs217.wyn.cnw.co.nz:37173 with 1059.9 MB RAM, BlockManagerId(2, wynchcs217.wyn.cnw.co.nz, 37173) 15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager wynchcs219.wyn.cnw.co.nz:53850 with 1059.9 MB RAM, BlockManagerId(1, wynchcs219.wyn.cnw.co.nz, 53850) 15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager wynchcs220.wyn.cnw.co.nz:35670 with 1060.3 MB RAM, BlockManagerId(0, wynchcs220.wyn.cnw.co.nz, 35670) 15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager wynchcs218.wyn.cnw.co.nz:46890 with 1059.9 MB RAM, BlockManagerId(3, wynchcs218.wyn.cnw.co.nz, 46890) 15/01/21 15:52:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, wynchcs217.wyn.cnw.co.nz): java.io.IOException: Connecting to CONSB2A.cnw.co.nz/143.96.130.27:56117 timed out (12 ms) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:188) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/01/21 15:52:23 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 2, wynchcs220.wyn.cnw.co.nz, NODE_LOCAL, 1366 bytes) 15/01/21 15:55:35 INFO TaskSchedulerImpl: Cancelling stage 0 15/01/21 15:55:35 INFO TaskSchedulerImpl: Stage 0 was cancelled 15/01/21 15:55:35 INFO DAGScheduler: Job 0 failed: count at RowMatrix.scala:76, took 689.331309 s Exception in thread main org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled Can you please let me know how can I define the port 56117 to some other port ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Discourse: A proposed alternative to the Spark User list
Very well stated. Thanks for putting in the effort to formalize your thoughts of which I agree entirely. How are these type of decisions made traditionally in the Spark community? Is there a formal process? What's the next step? Thanks again From: nicholas.cham...@gmail.com Date: Thu, 22 Jan 2015 02:55:33 + Subject: Re: Discourse: A proposed alternative to the Spark User list To: petar.zece...@gmail.com; user@spark.apache.org I think a few things need to be laid out clearly: This mailing list is the “official” user discussion platform. That is, it is sponsored and managed by the ASF. Users are free to organize independent discussion platforms focusing on Spark, and there is already one such platform in Stack Overflow under the apache-spark and related tags. Stack Overflow works quite well. The ASF will not agree to deprecating or migrating this user list to a platform that they do not control. This mailing list has grown to an unwieldy size and discussions are hard to find or follow; discussion tooling is also lacking. We want to improve the utility and user experience of this mailing list. We don’t want to fragment this “official” discussion community. Nabble is an independent product not affiliated with the ASF. It offers a slightly better interface to the Apache mailing list archives. So to respond to some of your points, pzecevic: Apache user group could be frozen (not accepting new questions, if that’s possible) and redirect users to Stack Overflow (automatic reply?). From what I understand of the ASF’s policies, this is not possible. :( This mailing list must remain the official Spark user discussion platform. Other thing, about new Stack Exchange site I proposed earlier. If a new site is created, there is no problem with guidelines, I think, because Spark community can apply different guidelines for the new site. I think Stack Overflow and the various Spark tags are working fine. I don’t see a compelling need for a Stack Exchange dedicated to Spark, either now or in the near future. Also, I doubt a Spark-specific site can pass the 4 tests in the Area 51 FAQ: Almost all Spark questions are on-topic for Stack Overflow Stack Overflow already exists, it already has a tag for Spark, and nobody is complaining You’re not creating such a big group that you don’t have enough experts to answer all possible questions There’s a high probability that users of Stack Overflow would enjoy seeing the occasional question about Spark I think complaining won’t be sufficient. :) Someone expressed a concern that they won’t allow creating a project-specific site, but there already exist some project-specific sites, like Tor, Drupal, Ubuntu… The communities for these projects are many, many times larger than the Spark community is or likely ever will be, simply due to the nature of the problems they are solving. What we need is an improvement to this mailing list. We need better tooling than Nabble to sit on top of the Apache archives, and we also need some way to control the volume and quality of mail on the list so that it remains a useful resource for the majority of users. Nick On Wed Jan 21 2015 at 3:13:21 PM pzecevic petar.zece...@gmail.com wrote: Hi, I tried to find the last reply by Nick Chammas (that I received in the digest) using the Nabble web interface, but I cannot find it (perhaps he didn't reply directly to the user list?). That's one example of Nabble's usability. Anyhow, I wanted to add my two cents... Apache user group could be frozen (not accepting new questions, if that's possible) and redirect users to Stack Overflow (automatic reply?). Old questions remain (and are searchable) on Nabble, new questions go to Stack Exchange, so no need for migration. That's the idea, at least, as I'm not sure if that's technically doable... Is it? dev mailing list could perhaps stay on Nabble (it's not that busy), or have a special tag on Stack Exchange. Other thing, about new Stack Exchange site I proposed earlier. If a new site is created, there is no problem with guidelines, I think, because Spark community can apply different guidelines for the new site. There is a FAQ about creating new sites: http://area51.stackexchange.com/faq It says: Stack Exchange sites are free to create and free to use. All we ask is that you have an enthusiastic, committed group of expert users who check in regularly, asking and answering questions. I think this requirement is satisfied... Someone expressed a concern that they won't allow creating a project-specific site, but there already exist some project-specific sites, like Tor, Drupal, Ubuntu... Later, though, the FAQ also says: If Y already exists, it already has a tag for X, and nobody is complaining (then you should not create a new site). But we could complain :) The advantage of having a separate site is that users, who should have more privileges, would need to earn them through Spark
Re: Confused why I'm losing workers/executors when writing a large file to S3
I’m getting the same issue on Spark 1.2.0. Despite having set “spark.core.connection.ack.wait.timeout” in spark-defaults.conf and verified in the job UI (port 4040) environment tab, I still get the “no heartbeat in 60 seconds” error. spark.core.connection.ack.wait.timeout=3600 15/01/22 07:29:36 WARN master.Master: Removing worker-20150121231529-numaq1-4-34948 because we got no heartbeat in 60 seconds On 14 Nov, 2014, at 3:04 pm, Reynold Xin r...@databricks.com wrote: Darin, You might want to increase these config options also: spark.akka.timeout 300 spark.storage.blockManagerSlaveTimeoutMs 30 On Thu, Nov 13, 2014 at 11:31 AM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: For one of my Spark jobs, my workers/executors are dying and leaving the cluster. On the master, I see something like the following in the log file. I'm surprised to see the '60' seconds in the master log below because I explicitly set it to '600' (or so I thought) in my spark job (see below). This is happening at the end of my job when I'm trying to persist a large RDD (probably around 300+GB) back to S3 (in 256 partitions). My cluster consists of 6 r3.8xlarge machines. The job successfully works when I'm outputting 100GB or 200GB. If you have any thoughts/insights, it would be appreciated. Thanks. Darin. Here is where I'm setting the 'timeout' in my spark job. SparkConf conf = new SparkConf() .setAppName(SparkSync Application) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.rdd.compress,true) .set(spark.core.connection.ack.wait.timeout,600); On the master, I see the following in the log file. 4/11/13 17:20:39 WARN master.Master: Removing worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no heartbeat in 60 seconds 14/11/13 17:20:39 INFO master.Master: Removing worker worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on ip-10-35-184-232.ec2.internal:51877 14/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2 On a worker, I see something like the following in the log file. 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362) 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception (java.net.SocketException) caught when processing request: Broken pipe 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception (java.net.SocketException) caught when processing request: Broken pipe 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark 14/11/13 17:21:34 INFO
Re: Exception in connection from worker to worker
Can you try the following: - Use Kryo Serializer - Enable RDD Compression - Repartition the data (Use hash partition, then all the similar keys will go in the same partition) Thanks Best Regards On Thu, Jan 22, 2015 at 4:05 AM, vantoniuk vita...@jetlore.com wrote: I have temporary fix for my case. My sample file was 2G / 50M lines in size. My initial configuration was 1000 splits. Based on my understanding of distributed algorithms, number of splits can affect the memory pressure in operations such as distinct and reduceByKey. So i tried to reduce the number of splits from 1000 to 100. Now I can run distinct and reduceByKey on files that are 2G / 50M lines. Unfortunately it still doesn't scale well. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-connection-from-worker-to-worker-tp20983p21302.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job for demoing Spark metrics monitoring?
I think you can easily run twitter popular hashtags https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala You can also save the data into a db and visualize it. Thanks Best Regards On Thu, Jan 22, 2015 at 12:37 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, I'll be showing our Spark monitoring http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ at the upcoming Spark Summit in NYC. I'd like to run some/any Spark job that really exercises Spark and makes it emit all its various metrics (so the metrics charts are full of data and not blank or flat and boring). Since we don't use Spark at Sematext yet, I was wondering if anyone could recommend some Spark app/job that's easy to run, just to get some Spark job to start emitting various Spark metrics? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Re: Is there a way to delete hdfs file/directory using spark API?
There is no direct way of doing it, but you can do something like this: val hadoopConf = ssc.sparkContext.hadoopConfiguration var hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) tmp_stream = ssc.textFileStream(/akhld/sigmoid/) // each line will have hdfs location to be deleted. tmp_stream.foreachRDD(path = { try { hdfs.delete(new org.apache.hadoop.fs.Path(path), true) } catch{ case e: Exception = println(w00t!! Exception!!HDFS = + e) } }) Thanks Best Regards On Thu, Jan 22, 2015 at 12:15 PM, LinQili lin_q...@outlook.com wrote: Hi, all I wonder how to delete hdfs file/directory using spark API?
RE: spark 1.1.0 save data to hdfs failed
The hdfs release should be hadoop 1.0.4. Ey-Chih Chow Date: Wed, 21 Jan 2015 16:56:25 -0800 Subject: Re: spark 1.1.0 save data to hdfs failed From: yuzhih...@gmail.com To: eyc...@hotmail.com CC: user@spark.apache.org What hdfs release are you using ? Can you check namenode log around time of error below to see if there is some clue ? Cheers On Wed, Jan 21, 2015 at 4:51 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the following fragment of a scala program to save data to hdfs: contextAwareEvents .map(e = (new AvroKey(e), null)) .saveAsNewAPIHadoopFile(hdfs:// + masterHostname + :9000/ETL/output/ + dateDir, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], job.getConfiguration) But it failed with the following error messages. Is there any people who can help? Thanks. Ey-Chih Chow = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: ip-10-33-140-157/10.33.140.157; destination host is: ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950) === -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a way to delete hdfs file/directory using spark API?
You can use Hadoop Client Api to remove files https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#delete(org.apache.hadoop.fs.Path, boolean). I don't think spark has any wrapper on hadoop filesystem APIs. On Thu, Jan 22, 2015 at 12:15 PM, LinQili lin_q...@outlook.com wrote: Hi, all I wonder how to delete hdfs file/directory using spark API?
loading utf16le file with sc.textFile
How can I load a utf16le file with BOM using sc.textFile? Right now I get a String that is garbled. I can't find documentation on using a different encoding when loading a text file. Any help is appreciated. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dynamically change receiver for a spark stream
thanks for the replies. is this something we can get around? Tried to hack into the code without much success. On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Are these numbers abnormal for spark streaming?
Hi Guys, I've got Spark Streaming set up for a low data rate system (using spark's features for analysis, rather than high throughput). Messages are coming in throughout the day, at around 1-20 per second (finger in the air estimate...not analysed yet). In the spark streaming UI for the application, I'm getting the following after 17 hours. StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed batches: 16482Waiting batches: 1 Statistics over last 100 processed batchesReceiver StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF 144727-RmqReceiver-1ACTIVEBR 124726-Batch Processing StatisticsMetricLast batchMinimum25th percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 seconds9 hours 15 minutes 8 seconds Are these normal. I was wondering what the scheduling delay and total delay terms are, and if it's normal for them to be 9 hours. I've got a standalone spark master and 4 spark nodes. The streaming app has been given 4 cores, and it's using 1 core per worker node. The streaming app is submitted from a 5th machine, and that machine has nothing but the driver running. The worker nodes are running alongside Cassandra (and reading and writing to it). Any insights would be appreciated. Regards, Ashic.
Re: dynamically change receiver for a spark stream
we were thinking along the same line, that is to fix the number of streams and change the input and output channels dynamically. But could not make it work (seems that the receiver is not allowing any change in the config after it started). thanks, On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com wrote: One possible workaround could be to orchestrate launch/stopping of Streaming jobs on demand as long as the number of jobs/streams stay within the boundaries of the resources (cores) you've available. e.g. if you're using Mesos, Marathon offers a REST interface to manage job lifecycle. You will still need to solve the dynamic configuration through some alternative channel. On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote: thanks for the replies. is this something we can get around? Tried to hack into the code without much success. On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dynamically change receiver for a spark stream
Hi Tamas, I meant not changing the receivers, but starting/stopping the Streaming jobs. So you would have a 'small' Streaming job for a subset of streams that you'd configure-start-stop on demand. I haven't tried myself yet, but I think it should also be possible to create a Streaming Job from the Spark Job Server ( https://github.com/spark-jobserver/spark-jobserver). Then you would have a REST interface that even gives you the possibility of passing a configuration. -kr, Gerard. On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor jambo...@gmail.com wrote: we were thinking along the same line, that is to fix the number of streams and change the input and output channels dynamically. But could not make it work (seems that the receiver is not allowing any change in the config after it started). thanks, On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com wrote: One possible workaround could be to orchestrate launch/stopping of Streaming jobs on demand as long as the number of jobs/streams stay within the boundaries of the resources (cores) you've available. e.g. if you're using Mesos, Marathon offers a REST interface to manage job lifecycle. You will still need to solve the dynamic configuration through some alternative channel. On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote: thanks for the replies. is this something we can get around? Tried to hack into the code without much success. On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)
I'm also facing the same issue. is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dynamically change receiver for a spark stream
One possible workaround could be to orchestrate launch/stopping of Streaming jobs on demand as long as the number of jobs/streams stay within the boundaries of the resources (cores) you've available. e.g. if you're using Mesos, Marathon offers a REST interface to manage job lifecycle. You will still need to solve the dynamic configuration through some alternative channel. On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote: thanks for the replies. is this something we can get around? Tried to hack into the code without much success. On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dynamically change receiver for a spark stream
Hi Gerard, thanks, that makes sense. I'll try that out. Tamas On Wed, Jan 21, 2015 at 11:14 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tamas, I meant not changing the receivers, but starting/stopping the Streaming jobs. So you would have a 'small' Streaming job for a subset of streams that you'd configure-start-stop on demand. I haven't tried myself yet, but I think it should also be possible to create a Streaming Job from the Spark Job Server ( https://github.com/spark-jobserver/spark-jobserver). Then you would have a REST interface that even gives you the possibility of passing a configuration. -kr, Gerard. On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor jambo...@gmail.com wrote: we were thinking along the same line, that is to fix the number of streams and change the input and output channels dynamically. But could not make it work (seems that the receiver is not allowing any change in the config after it started). thanks, On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com wrote: One possible workaround could be to orchestrate launch/stopping of Streaming jobs on demand as long as the number of jobs/streams stay within the boundaries of the resources (cores) you've available. e.g. if you're using Mesos, Marathon offers a REST interface to manage job lifecycle. You will still need to solve the dynamic configuration through some alternative channel. On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote: thanks for the replies. is this something we can get around? Tried to hack into the code without much success. On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I don't think current Spark Streaming support this feature, all the DStream lineage is fixed after the context is started. Also stopping a stream is not supported, instead currently we need to stop the whole streaming context to meet what you want. Thanks Saisai -Original Message- From: jamborta [mailto:jambo...@gmail.com] Sent: Wednesday, January 21, 2015 3:09 AM To: user@spark.apache.org Subject: dynamically change receiver for a spark stream Hi all, we have been trying to setup a stream using a custom receiver that would pick up data from sql databases. we'd like to keep that stream context running and dynamically change the streams on demand, adding and removing streams based on demand. alternativel, if a stream is fixed, is it possible to stop a stream, change to config and start again? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RangePartitioner
Hi Rishi, If you look in the Spark UI, have any executors registered? Are you able to collect a jstack of the driver process? -Sandy On Tue, Jan 20, 2015 at 9:07 PM, Rishi Yadav ri...@infoobjects.com wrote: I am joining two tables as below, the program stalls at below log line and never proceeds. What might be the issue and possible solution? INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79 Table 1 has 450 columns Table2 has 100 columns Both tables have few million rows val table1= myTable1.as('table1) val table2= myTable2.as('table2) val results= table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr )) println(results.count()) Thanks and Regards, Rishi @meditativesoul
Re: spark 1.2 three times slower than spark 1.1, why?
thanks, Sean. I don't quite understand you have *more *partitions across *more *workers. It's within the same cluster, and the same data, thus I think the same partition, the same workers. we switched from spark 1.1 to 1.2, then it's 3x slower. (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found the problem. then we installed a standalone spark 1.1, stop the 1.2, run the same script, it's 3x faster. stop 1.1, start 1.2, 3x slower again) 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: spark 1.2 three times slower than spark 1.1, why?
maybe you mean different spark-submit script? we also use the same spark-submit script, thus the same memory, cores, etc configuration. 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: How to share a NonSerializable variable among tasks in the same worker node?
Singletons aren't hacks; it can be an entirely appropriate pattern for this. What exception do you get? From Spark or your code? I think this pattern is orthogonal to using Spark. On Jan 21, 2015 8:11 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: In case someone has the same problem: The singleton hack works for me sometimes, sometimes it doesn't in spark 1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really need to work with big indexes and you want to have the smallest amount of communication between master and nodes, as well as if you have RAM available just for one instance of the indexes data per machine, than I suggest you use spark with memcached . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 three times slower than spark 1.1, why?
Title: Samsung Enterprise Portal mySingle I was recently faced with a similar issue, but unfortunatelyIcould notfind out why it happened. Here'sjira ticket https://issues.apache.org/jira/browse/SPARK-5081of my previous post. Please checkyour shuffle I/O differences between the two in spark web UI because itcan bepossibly related to my case. Thanks Kevin --- Original Message --- Sender : Fengyun RAOraofeng...@gmail.com Date : 2015-01-21 17:41 (GMT+09:00) Title : Re: spark 1.2 three times slower than spark 1.1, why? maybe you mean different spark-submit script? we also use the same spark-submit script, thus the same memory, cores, etc configuration. 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, "Fengyun RAO" raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: spark 1.2 three times slower than spark 1.1, why?
I don't know how to debug distributed application, any tools or suggestion? but from spark web UI, the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and 1.2. there are no Shuffle Read and Spill. The only difference is Duration DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9 minspark 1.12 s17 s18 s18 s34 s 2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com: I mean that if you had tasks running on 10 machines now instead of 3 for some reason you would have more than 3 times the read load on your source of data all at once. Same if you made more executors per machine. But from your additional info it does not sound like this is the case. I think you need more debugging to pinpoint what is slower. On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote: thanks, Sean. I don't quite understand you have *more *partitions across *more * workers. It's within the same cluster, and the same data, thus I think the same partition, the same workers. we switched from spark 1.1 to 1.2, then it's 3x slower. (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found the problem. then we installed a standalone spark 1.1, stop the 1.2, run the same script, it's 3x faster. stop 1.1, start 1.2, 3x slower again) 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: Support for SQL on unions of tables (merge tables?)
Thanks Cheng! For the list, I talked with Michael Armbrust at a recent Spark meetup and his comments were: * For a union of tables, use a view and the Hive metastore * SQLContext might have the directory-traversing logic I need in it already * The union() of sequence files I saw was slow because Spark was probably trying to shuffle the whole union. A similar Spark SQL join will also be slow (or break) unless one runs statistics so that the smaller table can be broadcasted (e.g. see https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options ) I have never used Hive, so I'll have to investigate further. On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote: I think you can resort to a Hive table partitioned by date https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables On 1/11/15 9:51 PM, Paul Wais wrote: Dear List, What are common approaches for addressing over a union of tables / RDDs? E.g. suppose I have a collection of log files in HDFS, one log file per day, and I want to compute the sum of some field over a date range in SQL. Using log schema, I can read each as a distinct SchemaRDD, but I want to union them all and query against one 'table'. If this data were in MySQL, I could have a table for each day of data and use a MyISAM merge table to union these tables together and just query against the merge table. What's nice here is that MySQL persists the merge table, and the merge table is r/w, so one can just update the merge table once per day. (What's not nice is that merge tables scale poorly, backup admin is a pain, and oh hey I'd like to use Spark not MySQL). One naive and untested idea (that achieves implicit persistence): scan an HDFS directory for log files, create one RDD per file, union() the RDDs, then create a Schema RDD from that union(). A few specific questions: * Any good approaches to a merge / union table? (Other than the naive idea above). Preferably with some way to persist that table / RDD between Spark runs. (How does Impala approach this problem?) * Has anybody tried joining against such a union of tables / RDDs on a very large amount of data? When I've tried (non-spark-sql) union()ing Sequence Files, and then join()ing them against another RDD, Spark seems to try to compute the full union before doing any join() computation (and eventually OOMs the cluster because the union of Sequence Files is so big). I haven't tried something similar with Spark SQL. * Are there any plans related to this in the Spark roadmap? (This feature would be a nice compliment to, say, persistent RDD indices for interactive querying). * Related question: are there plans to use Parquet Index Pages to make Spark SQL faster? E.g. log indices over date ranges would be relevant here. All the best, -Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Kafka
You can probably try the Low Level Consumer option with Spark 1.2 https://github.com/dibbhatt/kafka-spark-consumer This Consumer can recover from any underlying failure of Spark Platform or Kafka and either retry or restart the receiver. This is being working nicely for us. Regards, Dibyendu On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 dhiraj.peech...@gmail.com wrote: Hi, I am having similar issues. Have you found any resolution ? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Hello Guys, I've re partitioned my kafkaStream so that it gets evenly distributed among the executors and the results are better. Still from the executors page it seems that only 1 executors all 8 cores are getting used and other executors are using just 1 core. Is this the correct interpretation based on the below data? If so how can we fix this? [image: Inline image 1] On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Hi Mukesh, How are you creating your receivers? Could you post the (relevant) code? -kr, Gerard. On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, I've re partitioned my kafkaStream so that it gets evenly distributed among the executors and the results are better. Still from the executors page it seems that only 1 executors all 8 cores are getting used and other executors are using just 1 core. Is this the correct interpretation based on the below data? If so how can we fix this? [image: Inline image 1] On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver)
Re: spark 1.2 three times slower than spark 1.1, why?
thanks JaeBoo, in our case, the shuffle write are similar. 2015-01-21 17:01 GMT+08:00 JaeBoo Jung itsjb.j...@samsung.com: I was recently faced with a similar issue, but unfortunately I could not find out why it happened. Here's jira ticket https://issues.apache.org/jira/browse/SPARK-5081 of my previous post. Please check your shuffle I/O differences between the two in spark web UI because it can be possibly related to my case. Thanks Kevin --- *Original Message* --- *Sender* : Fengyun RAOraofeng...@gmail.com *Date* : 2015-01-21 17:41 (GMT+09:00) *Title* : Re: spark 1.2 three times slower than spark 1.1, why? maybe you mean different spark-submit script? we also use the same spark-submit script, thus the same memory, cores, etc configuration. 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: spark 1.2 three times slower than spark 1.1, why?
btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40 MB 2015-01-21 17:53 GMT+08:00 Fengyun RAO raofeng...@gmail.com: I don't know how to debug distributed application, any tools or suggestion? but from spark web UI, the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and 1.2. there are no Shuffle Read and Spill. The only difference is Duration DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9 minspark 1.12 s17 s18 s18 s34 s 2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com: I mean that if you had tasks running on 10 machines now instead of 3 for some reason you would have more than 3 times the read load on your source of data all at once. Same if you made more executors per machine. But from your additional info it does not sound like this is the case. I think you need more debugging to pinpoint what is slower. On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote: thanks, Sean. I don't quite understand you have *more *partitions across *more * workers. It's within the same cluster, and the same data, thus I think the same partition, the same workers. we switched from spark 1.1 to 1.2, then it's 3x slower. (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found the problem. then we installed a standalone spark 1.1, stop the 1.2, run the same script, it's 3x faster. stop 1.1, start 1.2, 3x slower again) 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: spark 1.2 three times slower than spark 1.1, why?
To force one instance per executor, you could explicitly subclass FlatMapFunction and have it lazy-create your parser in the subclass constructor. You might also want to try RDD#mapPartitions() (instead of RDD#flatMap() if you want one instance per partition. This approach worked well for me when I had a flat map function that used non-serializable native code / objects. FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: How to share a NonSerializable variable among tasks in the same worker node?
In case someone has the same problem: The singleton hack works for me sometimes, sometimes it doesn't in spark 1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really need to work with big indexes and you want to have the smallest amount of communication between master and nodes, as well as if you have RAM available just for one instance of the indexes data per machine, than I suggest you use spark with memcached . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 three times slower than spark 1.1, why?
Thanks, Paul, I don’t understand how subclass FlatMapFunction helps, could you show a sample code? We need one instance per executor, not per partition, thus mapPartitions() doesn’t help. 2015-01-21 16:07 GMT+08:00 Paul Wais paulw...@gmail.com: To force one instance per executor, you could explicitly subclass FlatMapFunction and have it lazy-create your parser in the subclass constructor. You might also want to try RDD#mapPartitions() (instead of RDD#flatMap() if you want one instance per partition. This approach worked well for me when I had a flat map function that used non-serializable native code / objects. FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot be a broadcast, what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node. If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor. 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line = LogParser.parseLine(line)) .groupByKey(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex(...) .foreach(_ = {}) where LogParser is a singleton which may take some time to initialized and is shared across the execuator. the flatMap stage is 3x slower. We tried to change spark.shuffle.manager back to hash, and spark.shuffle.blockTransferService back to nio, but didn’t help. May somebody explain possible causes, or what should we test or change to find it out
Re: spark-submit --py-files remote: Only local additional python files are supported
Thank you Andrew for you reply! I am very intested in having this feature. It is possible to run PySpark on AWS EMR in client mode(https://aws.amazon.com/articles/4926593393724923), but that kills the whole idea of running batch jobs in EMR on PySpark. Could you please (help to) create a task(with some details of possible implementation) for this feature? I'd like to implement that but I'm too new to Spark to know how to do it in a good way... -Vladimir On Tue, Jan 20, 2015 at 8:40 PM, Andrew Or and...@databricks.com wrote: Hi Vladimir, Yes, as the error messages suggests, PySpark currently only supports local files. This does not mean it only runs in local mode, however; you can still run PySpark on any cluster manager (though only in client mode). All this means is that your python files must be on your local file system. Until this is supported, the straightforward workaround then is to just copy the files to your local machine. -Andrew 2015-01-20 7:38 GMT-08:00 Vladimir Grigor vladi...@kiosked.com: Hi all! I found this problem when I tried running python application on Amazon's EMR yarn cluster. It is possible to run bundled example applications on EMR but I cannot figure out how to run a little bit more complex python application which depends on some other python scripts. I tried adding those files with '--py-files' and it works fine in local mode but it fails and gives me following message when run in EMR: Error: Only local python files are supported: s3://pathtomybucket/mylibrary.py. Simplest way to reproduce in local: bin/spark-submit --py-files s3://whatever.path.com/library.py main.py Actual commands to run it in EMR #launch cluster aws emr create-cluster --name SparkCluster --ami-version 3.3.1 --instance-type m1.medium --instance-count 2 --ec2-attributes KeyName=key20141114 --log-uri s3://pathtomybucket/cluster_logs --enable-debugging --use-default-roles --bootstrap-action Name=Spark,Path=s3://pathtomybucket/bootstrap-actions/spark/install-spark,Args=[-s, http://pathtomybucket/bootstrap-actions/spark ,-l,WARN,-v,1.2,-b,2014121700,-x] #{ # ClusterId: j-2Y58DME79MPQJ #} #run application aws emr add-steps --cluster-id j-2Y58DME79MPQJ --steps ActionOnFailure=CONTINUE,Name=SparkPy,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn-cluster,--py-files,s3://pathtomybucket/tasks/demo/main.py,main.py] #{ #StepIds: [ #s-2UP4PP75YX0KU #] #} And in stderr of that step I get Error: Only local python files are supported: s3://pathtomybucket/tasks/demo/main.py. What is the workaround or correct way to do it? Using hadoop's distcp to copy dependency files from s3 to nodes as another pre-step? Regards, Vladimir
Re: Closing over a var with changing value in Streaming application
Hi, On Wed, Jan 21, 2015 at 9:13 PM, Bob Tiernay btier...@hotmail.com wrote: Maybe I'm misunderstanding something here, but couldn't this be done with broadcast variables? I there is the following caveat from the docs: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later) Well, I think I need a modifiable state (modifiable = changes once per interval) that stores the number of total items seen so far in the lifetime of my application, and I need this number on each executor. Since this number changes after every interval processed, I think broadcast variables are probably not appropriate in this case. Thanks Tobias
Possible to restart (or stop and create) a StreamingContext
hi all, I have been experimenting with creating a sparkcontext - streamingcontext - a few streams - starting - stopping - creating new streams - starting a new (or the existing) streamingcontext with the new streams (I need to keep the existing sparkcontext alive as it would run other spark jobs) I ran into a few problems: - I cannot seem to create a new streaming context after another one was shut down. I get this error: 15/01/21 12:43:16 INFO MetricsSystem: Metrics already registered java.lang.IllegalArgumentException: A metric named app-20150121123832-0008.driver.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processStartTime already exists - Or if I try to start the one that was stopped I get this: org.apache.spark.SparkException: StreamingContext has already been started - It seems even after the streaming context is stopped, it still shows up in the job info (spark web UI). is there a better way to do this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Possible-to-restart-or-stop-and-create-a-StreamingContext-tp21291.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Discourse: A proposed alternative to the Spark User list
I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)
The SparkContext is lost when I call the persist function from the sink function, just before the function call... everything works as intended so I guess is the FunctionN class serialisation what it's causing the problem. I will try to embed the functionality in the sink method to verify that. 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: The following functions, def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { data.foreachRDD { rdd = rdd.cache() val (minTime, maxTime): (Long, Long) = rdd.map { case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time) }.fold((Long.MaxValue, Long.MinValue)) { case ((min, max), (num, _)) = (math.min(min, num), math.max(max, num)) } if (minTime != Long.MaxValue maxTime != Long.MinValue) { rdd.map(_._1).distinct().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } } rdd.unpersist(blocking = false) } } def persist(game: GameID, category: Category, min: Long, max: Long, data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { val family: String = s${parameters.table.family}_$ {game.repr}_${category.repr} val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = data.sparkContext.cassandraTable[(Long, Long, String, Array[Byte])](parameters.table.keyspace, family) val fil: RDD[((TimeSeriesKey, Platform), HLL)] = cas .where(time = ?, new Date(min)) .where(time = ?, new Date(max)) .map { case (date, time, platform, array) = ((TimeSeriesKey(date, time), Platform(platform)), HyperLogLog.fromBytes(array)) } data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map { case ((key, platform), (value, maybe)) = (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array = value + array))) }.saveToCassandra(parameters.table.keyspace, family) } are causing this exception at runtime: 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID 126) java.lang.NullPointerException at com.datastax.spark.connector.SparkContextFunctions. cassandraTable$default$3(SparkContextFunctions.scala:47) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:41) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:40) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of SparkContextFunctions.scala is the implicit CassandraConnector that uses the underlying spark context to retrieve the SparkConf. After a few hours debugging the code, the source of the problem is that, data.sparkContext is returning null. It seems that the RDD is serialised and the SparkContext is lost. Is this the expected behaviour? Is a known bug? I have ran out of ideas on how to make this work so I'm open to suggestions. Kind regards, Luis
Re: ClosureCleaner should use ClassLoader created by SparkContext
Here is the stack trace for reference. Notice that this happens in when the job spawns a new thread. java.lang.ClassNotFoundException: com.myclass$$anonfun$8$$anonfun$9 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_71] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_71] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_71] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_71] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_71] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_71] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_71] at java.lang.Class.forName0(Native Method) ~[na:1.7.0_71] at java.lang.Class.forName(Class.java:274) ~[na:1.7.0_71] at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:260) ~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0] at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na] at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na] at org.apache.spark.util.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:87) ~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:107) ~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0] at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) ~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0] at org.apache.spark.rdd.RDD.map(RDD.scala:271) ~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0] at com.myclass.com$myclass$$load(myclass.scala:375) ~[na:na] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) ~[org.scala-lang.scala-library-2.11.5.jar:na] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) ~[org.scala-lang.scala-library-2.11.5.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] On Wed Jan 21 2015 at 17:34:34 Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: While implementing a spark server, I realized that Thread's context loader must be set to any dynamically loaded classloader so that ClosureCleaner can do it's thing. Should the ClosureCleaner not use classloader created by SparkContext (that has all dynamically added jars via SparkContext.addJar) instead of using Thread.currentThread.getContextClassLoader while looking up class in InnerClosureFinder? Thanks, Aniket
NullPointer when access rdd.sparkContext (Spark 1.1.1)
The following functions, def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { data.foreachRDD { rdd = rdd.cache() val (minTime, maxTime): (Long, Long) = rdd.map { case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time) }.fold((Long.MaxValue, Long.MinValue)) { case ((min, max), (num, _)) = (math.min(min, num), math.max(max, num)) } if (minTime != Long.MaxValue maxTime != Long.MinValue) { rdd.map(_._1).distinct().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } } rdd.unpersist(blocking = false) } } def persist(game: GameID, category: Category, min: Long, max: Long, data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { val family: String = s${parameters.table.family}_$ {game.repr}_${category.repr} val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = data.sparkContext.cassandraTable[(Long, Long, String, Array[Byte])](parameters.table.keyspace, family) val fil: RDD[((TimeSeriesKey, Platform), HLL)] = cas .where(time = ?, new Date(min)) .where(time = ?, new Date(max)) .map { case (date, time, platform, array) = ((TimeSeriesKey(date, time), Platform(platform)), HyperLogLog.fromBytes(array)) } data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map { case ((key, platform), (value, maybe)) = (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array = value + array))) }.saveToCassandra(parameters.table.keyspace, family) } are causing this exception at runtime: 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID 126) java.lang.NullPointerException at com.datastax.spark.connector.SparkContextFunctions. cassandraTable$default$3(SparkContextFunctions.scala:47) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:41) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:40) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of SparkContextFunctions.scala is the implicit CassandraConnector that uses the underlying spark context to retrieve the SparkConf. After a few hours debugging the code, the source of the problem is that, data.sparkContext is returning null. It seems that the RDD is serialised and the SparkContext is lost. Is this the expected behaviour? Is a known bug? I have ran out of ideas on how to make this work so I'm open to suggestions. Kind regards, Luis
Re: Error for first run from iPython Notebook
Is this the wrong list to be asking this question? I'm not even sure where to start troubleshooting. On Tue, Jan 20, 2015 at 9:48 AM, Dave dla...@gmail.com wrote: Not sure if anyone who can help has seen this. Any suggestions would be appreciated, thanks! On Mon Jan 19 2015 at 1:50:43 PM Dave dla...@gmail.com wrote: Hi, I've setup my first spark cluster (1 master, 2 workers) and an iPython notebook server that I'm trying to setup to access the cluster. I'm running the workers from Anaconda to make sure the python setup is correct on each box. The iPy notebook server appears to have everything setup correctly, and I'm able to initialize Spark and push a job out. However, the job is failing, and I'm not sure how to troubleshoot. Here's the code: from pyspark import SparkContext CLUSTER_URL = 'spark://192.168.1.20:7077' sc = SparkContext( CLUSTER_URL, 'pyspark') def sample(p): x, y = random(), random() return 1 if x*x + y*y 1 else 0 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b) print Pi is roughly %f % (4.0 * count / 20) And here's the error: Py4JJavaError Traceback (most recent call last)ipython-input-4-e8dce94b43bb in module() 3 return 1 if x*x + y*y 1 else 0 4 5 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b) 6 print Pi is roughly %f % (4.0 * count / 20) /opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f)713 yield reduce(f, iterator, initial)714 -- 715 vals = self.mapPartitions(func).collect()716 if vals:717 return reduce(f, vals) /opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self)674 675 with SCCallSiteSync(self.context) as css:-- 676 bytesInJava = self._jrdd.collect().iterator()677 return list(self._collect_iterator_through_file(bytesInJava))678 /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)536 answer = self.gateway_client.send_command(command)537 return_value = get_return_value(answer, self.gateway_client,-- 538 self.target_id, self.name)539 540 for temp_arg in temp_args: /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)298 raise Py4JJavaError(299 'An error occurred while calling {0}{1}{2}.\n'.-- 300 format(target_id, '.', name), value)301 else:302 raise Py4JError( Py4JJavaError: An error occurred while calling o28.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 0.0 (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/spark-1.2.0/python/pyspark/worker.py, line 107, in main process() File /opt/spark-1.2.0/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/spark-1.2.0/python/pyspark/serializers.py, line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/spark-1.2.0/python/pyspark/rdd.py, line 710, in func initial = next(iterator) File ipython-input-4-e8dce94b43bb, line 2, in sample TypeError: 'module' object is not callable at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Re: Possible to restart (or stop and create) a StreamingContext
Just found this in the documentation: A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created. in this case, I assume the error I reported above is a bug. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Possible-to-restart-or-stop-and-create-a-StreamingContext-tp21291p21294.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
send me the current code here. I will fix and send back to you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21295.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Thanks for looking Cheng. Just to clarify in case other people need this sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata, false)did work well in terms of dropping rowgroups/showing small input size. What was odd about that is that the overall time wasn't much better...but maybe that was overhead from sending the metadata clientside. Thanks again and looking forward to your fix On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Yana, Sorry for the late reply, missed this important thread somehow. And many thanks for reporting this. It turned out to be a bug — filter pushdown is only enabled when using client side metadata, which is not expected, because task side metadata code path is more performant. And I guess that the reason why setting parquet.task.side.metadata to false didn’t reduce input size for you is because you set the configuration with Spark API, or put it into spark-defaults.conf. This configuration goes to Hadoop Configuration, and Spark only merge properties whose names start with spark.hadoop into Hadoop Configuration instances. You may try to put parquet.task.side.metadata config into Hadoop core-site.xml, and then re-run the query. I can see significant differences by doing so. I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for reporting all the details! Cheng On 1/13/15 12:56 PM, Yana Kadiyska wrote: Attempting to bump this up in case someone can help out after all. I spent a few good hours stepping through the code today, so I'll summarize my observations both in hope I get some help and to help others that might be looking into this: 1. I am setting *spark.sql.parquet.**filterPushdown=true* 2. I can see by stepping through the driver debugger that PaquetTableOperations.execute sets the filters via ParquetInputFormat.setFilterPredicate (I checked the conf object, things appear OK there) 3. In FilteringParquetRowInputFormat, I get through the codepath for getTaskSideSplits. It seems that the codepath for getClientSideSplits would try to drop rowGroups but I don't see similar in getTaskSideSplit. Does anyone have pointers on where to look after this? Where is rowgroup filtering happening in the case of getTaskSideSplits? I can attach to the executor but am not quite sure what code related to Parquet gets called executor side...also don't see any messages in the executor logs related to Filtering predicates. For comparison, I went through the getClientSideSplits and can see that predicate pushdown works OK: sc.hadoopConfiguration.set(parquet.task.side.metadata,false) 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split Strategy 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800) 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do not pass filter predicate (28 %) ! Is it possible that this is just a UI bug? I can see Input=4G when using (parquet.task.side.metadata,false) and Input=140G when using (parquet.task.side.metadata,true) but the runtimes are very comparable? [image: Inline image 1] JobId 4 is the ClientSide split, JobId 5 is the TaskSide split. On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
ClosureCleaner should use ClassLoader created by SparkContext
While implementing a spark server, I realized that Thread's context loader must be set to any dynamically loaded classloader so that ClosureCleaner can do it's thing. Should the ClosureCleaner not use classloader created by SparkContext (that has all dynamically added jars via SparkContext.addJar) instead of using Thread.currentThread.getContextClassLoader while looking up class in InnerClosureFinder? Thanks, Aniket
RE: Closing over a var with changing value in Streaming application
Maybe I'm misunderstanding something here, but couldn't this be done with broadcast variables? I there is the following caveat from the docs: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later) But isn't this exactly the semantics you want (i.e. not the same value)? Date: Wed, 21 Jan 2015 21:02:31 +0900 Subject: Re: Closing over a var with changing value in Streaming application From: t...@preferred.jp To: ak...@sigmoidanalytics.com CC: user@spark.apache.org Hi again, On Wed, Jan 21, 2015 at 4:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: How about using accumulators? As far as I understand, they solve the part of the problem that I am not worried about, namely increasing the counter. I was more worried about getting that counter/accumulator value back to the executors. Uh, I may have been a bit quick here... So I had this one working: var totalNumberOfItems = 0L // update the keys of the stream data val globallyIndexedItems = inputStream.map(keyVal = (keyVal._1 + totalNumberOfItems, keyVal._2)) // increase the number of total seen items inputStream.foreachRDD(rdd = { totalNumberOfItems += rdd.count }) and used the dstream.foreachRDD(rdd = someVar += rdd.count) pattern at a number of places. Then, however, I added a dstream.transformWith(otherDStream, func)call, which somehow changed the order in which the DStreams are computed. In particular, suddenly some of my DStream values were computed before the foreachRDD calls that set the proper variables were executed, which lead to completely unpredictable behavior. So especially when looking at the existence of spark.streaming.concurrentJobs, I suddenly feel like none of DStream computations done on executors should depend on the ordering of output operations done on the driver. (And I am afraid this includes accumulator updates.) Thinking about this, I feel I don't even know how I can realize a globally (over the lifetime of my stream) increasing ID in my DStream. Do I need something like val counts: DStream[(Int, Long)] = stream.count().map((1, _)).updateStateByKey(...)with a pseudo-key just to keep a tiny bit of state from one interval to the next? Really thankful for any insights,Tobias
Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)
Yes, I have just found that. By replacing, rdd.map(_._1).distinct().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } with, rdd.map(_._1).distinct().collect().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } everything works as expected. 2015-01-21 14:18 GMT+00:00 Sean Owen so...@cloudera.com: It looks like you are trying to use the RDD in a distributed operation, which won't work. The context will be null. On Jan 21, 2015 1:50 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: The SparkContext is lost when I call the persist function from the sink function, just before the function call... everything works as intended so I guess is the FunctionN class serialisation what it's causing the problem. I will try to embed the functionality in the sink method to verify that. 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: The following functions, def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { data.foreachRDD { rdd = rdd.cache() val (minTime, maxTime): (Long, Long) = rdd.map { case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time) }.fold((Long.MaxValue, Long.MinValue)) { case ((min, max), (num, _)) = (math.min(min, num), math.max(max, num)) } if (minTime != Long.MaxValue maxTime != Long.MinValue) { rdd.map(_._1).distinct().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } } rdd.unpersist(blocking = false) } } def persist(game: GameID, category: Category, min: Long, max: Long, data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { val family: String = s${parameters.table.family}_$ {game.repr}_${category.repr} val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = data.sparkContext.cassandraTable[(Long, Long, String, Array[Byte])](parameters.table.keyspace, family) val fil: RDD[((TimeSeriesKey, Platform), HLL)] = cas .where(time = ?, new Date(min)) .where(time = ?, new Date(max)) .map { case (date, time, platform, array) = ((TimeSeriesKey(date, time), Platform(platform)), HyperLogLog.fromBytes(array)) } data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map { case ((key, platform), (value, maybe)) = (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array = value + array))) }.saveToCassandra(parameters.table.keyspace, family) } are causing this exception at runtime: 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID 126) java.lang.NullPointerException at com.datastax.spark.connector.SparkContextFunctions. cassandraTable$default$3(SparkContextFunctions.scala:47) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:41) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:40) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach( Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of SparkContextFunctions.scala is the implicit CassandraConnector that uses the underlying spark context to retrieve the SparkConf. After a few hours debugging the code, the source of the problem is that, data.sparkContext is returning null. It seems that the RDD is serialised and the SparkContext is lost. Is this the expected behaviour? Is a known bug? I have ran out of ideas on how to make this work so I'm open to suggestions. Kind regards, Luis
Re: Discourse: A proposed alternative to the Spark User list
Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Broadcast variable questions
Hi, Spark 1.2.0, standalone, local mode(for test) Here are several questions on broadcast variable: 1) Where is the broadcast variable cached on executors ? In memory or On disk ? I read somewhere, it was said these variables are stored in spark.local.dir. But I can find any info in Spark 1.2 document. I encountered a problem with broadcast variables. I have a loop in which a broadcast variable is created, after 3 iteration, the used memory increased quickly until the full size, and Spark is blocked, no error message, no exception, just blocked. I would like to make sure whether it is caused by too many broadcast variables, because I did not call unpersist() on each broadcast variable. 2) I find that broadcast variable has destroy() and unpersist() method, what's the difference between them? If a broadcast variable is destroyed, is it removed from where it is stored ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variable-questions-tp21292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Confused about shuffle read and shuffle write
I have the following code in a Spark Job. // Get the baseline input file(s) JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(newConvertFromWritableTypes()).partitionBy(newHashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER()); // Use 'substring' to extract epoch values. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(newExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER()); When looking at the STAGE information for my job, I notice the following: To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of input (from S3) and 377GB of shuffle write (presumably because of the hash partitioning). This all makes sense. To construct the baselinePairRDD, it also takes about 7.5 minutes. I thought that was a bit odd. But what I thought was really odd is why there was also 330GB of shuffle read in this stage. I would have thought there should be 0 shuffle read in this stage. What I'm confused about is why there is even any 'shuffle read' when constructing the baselinePairRDD. If anyone could shed some light on this it would be appreciated. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)
It looks like you are trying to use the RDD in a distributed operation, which won't work. The context will be null. On Jan 21, 2015 1:50 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: The SparkContext is lost when I call the persist function from the sink function, just before the function call... everything works as intended so I guess is the FunctionN class serialisation what it's causing the problem. I will try to embed the functionality in the sink method to verify that. 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: The following functions, def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { data.foreachRDD { rdd = rdd.cache() val (minTime, maxTime): (Long, Long) = rdd.map { case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time) }.fold((Long.MaxValue, Long.MinValue)) { case ((min, max), (num, _)) = (math.min(min, num), math.max(max, num)) } if (minTime != Long.MaxValue maxTime != Long.MinValue) { rdd.map(_._1).distinct().foreach { case (game, category) = persist(game, category, minTime, maxTime, rdd) } } rdd.unpersist(blocking = false) } } def persist(game: GameID, category: Category, min: Long, max: Long, data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { val family: String = s${parameters.table.family}_$ {game.repr}_${category.repr} val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = data.sparkContext.cassandraTable[(Long, Long, String, Array[Byte])](parameters.table.keyspace, family) val fil: RDD[((TimeSeriesKey, Platform), HLL)] = cas .where(time = ?, new Date(min)) .where(time = ?, new Date(max)) .map { case (date, time, platform, array) = ((TimeSeriesKey(date, time), Platform(platform)), HyperLogLog.fromBytes(array)) } data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map { case ((key, platform), (value, maybe)) = (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array = value + array))) }.saveToCassandra(parameters.table.keyspace, family) } are causing this exception at runtime: 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID 126) java.lang.NullPointerException at com.datastax.spark.connector.SparkContextFunctions. cassandraTable$default$3(SparkContextFunctions.scala:47) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:41) at com.mindcandy.services.mako.concurrentusers. ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( ActiveUsersJobImpl.scala:40) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach( Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( SparkContext.scala:1143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of SparkContextFunctions.scala is the implicit CassandraConnector that uses the underlying spark context to retrieve the SparkConf. After a few hours debugging the code, the source of the problem is that, data.sparkContext is returning null. It seems that the RDD is serialised and the SparkContext is lost. Is this the expected behaviour? Is a known bug? I have ran out of ideas on how to make this work so I'm open to suggestions. Kind regards, Luis
sparkcontext.objectFile return thousands of partitions
Why sc.objectFile(...) return a Rdd with thousands of partitions? I save a rdd to file system using rdd.saveAsObjectFile(file:///tmp/mydir) Note that the rdd contains 7 millions object. I check the directory /tmp/mydir/, it contains 8 partitions part-0 part-2 part-4 part-6 _SUCCESS part-1 part-3 part-5 part-7 I then load the rdd back using val rdd2 = sc.objectFile[LabeledPoint]( (file:///tmp/mydir, 8) I expect rdd2 to have 8 partitions. But from the master UI, I see that rdd2 has over 1000 partitions. This is very inefficient. How can I limit it to 8 partitions just like what is stored on the file system? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541