CVE-2019-10099: Apache Spark unencrypted data on local disk
Severity: Important Vendor: The Apache Software Foundation Versions affected: All Spark 1.x, Spark 2.0.x, Spark 2.1.x, and 2.2.x versions Spark 2.3.0 to 2.3.2 Description: Prior to Spark 2.3.3, in certain situations Spark would write user data to local disk unencrypted, even if spark.io.encryption.enabled=true. This includes cached blocks that are fetched to disk (controlled by spark.maxRemoteBlockSizeFetchToMem); in SparkR, using parallelize; in Pyspark, using broadcast and parallelize; and use of python udfs. Mitigation: 1.x, 2.0.x, 2.1.x, 2.2.x, 2.3.x users should upgrade to 2.3.3 or newer, including 2.4.x. Credit: This issue was reported by Thomas Graves of NVIDIA. References: https://spark.apache.org/security.html https://issues.apache.org/jira/browse/SPARK-28626
Re: [SHUFFLE]FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
We haven't seen many of these, but we have seen it a couple of times -- there is ongoing work under SPARK-26089 to address the issue we know about, namely that we don't detect corruption in large shuffle blocks. Do you believe the cases you have match that -- does it appear to be corruption in large shuffle blocks? Or do you not have compression or encryption enabled? Both the prior solution and the work under SPARK-26089 only work if either one of those is enabled. On Tue, Mar 12, 2019 at 9:36 AM Vadim Semenov wrote: > I/We have seen this error before on 1.6 but ever since we upgraded to 2.1 > two years ago we haven't seen it > > On Tue, Mar 12, 2019 at 2:19 AM wangfei wrote: > >> Hi all, >> Non-deterministic FAILED_TO_UNCOMPRESS(5) or ’Stream is corrupted’ >> errors >> may occur during shuffle read, described as this JIRA( >> https://issues.apache.org/jira/browse/SPARK-4105). >> There is not new comment for a long time in this JIRA. So, Is >> there anyone seen these errors in latest version, such as spark-2.3? >> Can anyone provide a reproducible case or analyze the cause of >> these errors? >> Thanks. >> > > > -- > Sent from my iPhone >
Re: CVE-2018-11760: Apache Spark local privilege escalation vulnerability
I received some questions about what the exact change was which fixed the issue, and the PMC decided to post info in jira to make it easier for the community to track. The relevant details are all on https://issues.apache.org/jira/browse/SPARK-26802 On Mon, Jan 28, 2019 at 1:08 PM Imran Rashid wrote: > Severity: Important > > Vendor: The Apache Software Foundation > > Versions affected: > All Spark 1.x, Spark 2.0.x, and Spark 2.1.x versions > Spark 2.2.0 to 2.2.2 > Spark 2.3.0 to 2.3.1 > > Description: > When using PySpark , it's possible for a different local user to connect > to the Spark application and impersonate the user running the Spark > application. This affects versions 1.x, 2.0.x, 2.1.x, 2.2.0 to 2.2.2, and > 2.3.0 to 2.3.1. > > Mitigation: > 1.x, 2.0.x, 2.1.x, and 2.2.x users should upgrade to 2.2.3 or newer > 2.3.x users should upgrade to 2.3.2 or newer > Otherwise, affected users should avoid using PySpark in multi-user > environments. > > Credit: > This issue was reported by Luca Canali and Jose Carlos Luna Duran from > CERN. > > References: > https://spark.apache.org/security.html >
CVE-2018-11760: Apache Spark local privilege escalation vulnerability
Severity: Important Vendor: The Apache Software Foundation Versions affected: All Spark 1.x, Spark 2.0.x, and Spark 2.1.x versions Spark 2.2.0 to 2.2.2 Spark 2.3.0 to 2.3.1 Description: When using PySpark , it's possible for a different local user to connect to the Spark application and impersonate the user running the Spark application. This affects versions 1.x, 2.0.x, 2.1.x, 2.2.0 to 2.2.2, and 2.3.0 to 2.3.1. Mitigation: 1.x, 2.0.x, 2.1.x, and 2.2.x users should upgrade to 2.2.3 or newer 2.3.x users should upgrade to 2.3.2 or newer Otherwise, affected users should avoid using PySpark in multi-user environments. Credit: This issue was reported by Luca Canali and Jose Carlos Luna Duran from CERN. References: https://spark.apache.org/security.html
Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?
Serga, can you explain a bit more why you want this ability? If the node is really bad, wouldn't you want to decomission the NM entirely? If you've got heterogenous resources, than nodelabels seem like they would be more appropriate -- and I don't feel great about adding workarounds for the node-label limitations into blacklisting. I don't want to be stuck supporting a configuration with too limited a use case. (may be better to move discussion to https://issues.apache.org/jira/browse/SPARK-26688 so its better archived, I'm responding here in case you aren't watching that issue) On Tue, Jan 22, 2019 at 6:09 AM Jörn Franke wrote: > You can try with Yarn node labels: > > https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeLabel.html > > Then you can whitelist nodes. > > Am 19.01.2019 um 00:20 schrieb Serega Sheypak : > > Hi, is there any possibility to tell Scheduler to blacklist specific nodes > in advance? > >
Re: Heap Memory in Spark 2.3.0
perhaps this is https://issues.apache.org/jira/browse/SPARK-24578? that was reported as a performance issue, not OOMs, but its in the exact same part of the code and the change was to reduce the memory pressure significantly. On Mon, Jul 16, 2018 at 1:43 PM, Bryan Jeffrey wrote: > Hello. > > I am working to move our system from Spark 2.1.0 to Spark 2.3.0. Our > system is running on Spark managed via Yarn. During the course of the move > I mirrored the settings to our new cluster. However, on the Spark 2.3.0 > cluster with the same resource allocation I am seeing a number of executors > die due to OOM: > > 18/07/16 17:23:06 ERROR YarnClusterScheduler: Lost executor 5 on wn80: > Container killed by YARN for exceeding memory limits. 22.0 GB of 22 GB > physical memory used. Consider boosting spark.yarn.executor.memoryOver > head. > > I increased spark.driver.memoryOverhead and spark.executor.memoryOverhead > from the default (384) to 2048. I went ahead and disabled vmem and pmem > Yarn checks on the cluster. With that disabled I see the following error: > > Caused by: java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1466) > at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1203) > at > org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:140) > at > org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123) > at > io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) > at > io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) > at > io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) > at > io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) > at > io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) > at > org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:222) > at > org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:146) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > > > > Looking at GC: > >[Eden: 16.0M(8512.0M)->0.0B(8484.0M) Survivors: 4096.0K->4096.0K Heap: > 8996.7M(20.0G)->8650.3M(20.0G)] > [Times: user=0.03 sys=0.01, real=0.01 secs] > 794.949: [G1Ergonomics (Heap Sizing) attempt heap expansion, reason: > allocation request failed, allocation request: 401255000 bytes] > 794.949: [G1Ergonomics (Heap Sizing) expand the heap, requested expansion > amount: 401255000 bytes, attempted expansion amount: 402653184 bytes] > 794.949: [G1Ergonomics (Heap Sizing) did not expand the heap, reason: heap > already fully expanded] >
Re: Spark Job Hangs on our production cluster
just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not something which should keep things blocked for 30 min. It suggest something wrong w/ the jvm, or classpath configuration, or a combination. Looks like you are trying to run in the repl, and for whatever reason the http server for the repl to serve classes is not responsive. I'd try running outside of the repl and see if that works. sorry not a full diagnosis but maybe this'll help a bit. On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.sh export JAVA_HOME=/opt/java export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/ export SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib /db2jcc4-10.6.jar export SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/local export SPARK_MASTER_WEBUI_PORT=8081 export SPARK_MASTER_IP=host1 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42 export SPARK_WORKER_MEMORY=24g export SPARK_WORKER_CORES=6 export SPARK_WORKER_DIR=/tmp/spark/work export SPARK_DRIVER_MEMORY=2g export SPARK_EXECUTOR_MEMORY=2g more spark-defaults.conf spark.master spark://host1:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://host1:9000/spark/eventLog spark.serializer org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps We are using AVRO file format a lot, and we have these 2 datasets, one is about 96G, and the other one is a little over 1T. Since we are using AVRO, so we also built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8 https://github.com/databricks/spark-avro/tree/a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is the latest version supporting Spark 1.2.x. Here is the problem we are facing on our production cluster, even the following simple spark-shell commands will hang in our production cluster: import org.apache.spark.sql.SQLContext val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import com.databricks.spark.avro._ val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/) bigData.registerTempTable(bigData) bigData.count() From the console, we saw following: [Stage 0: (44 + 42) / 7800] no update for more than 30 minutes and longer. The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS client looks like having problem to read them. Since we are running Spark on the data nodes, all the Spark tasks are running as NODE_LOCAL on locality level. If I go to the data/task node which Spark tasks hang, and use the JStack to dump the thread, I got the following on the top: 015-08-11 15:38:38 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode): Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on condition [0x] java.lang.Thread.State: RUNNABLE org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252) at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39) at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135) at java.lang.Thread.run(Thread.java:745) shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc runnable [0x7f060d198000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0x00067bf47710 (a io.netty.channel.nio.SelectedSelectionKeySet) - locked 0x00067bf374e8 (a java.util.Collections$UnmodifiableSet)
Re: Spark Job Hangs on our production cluster
sorry, by repl I mean spark-shell, I guess I'm used to them being used interchangeably. From that thread dump, the one thread that isn't stuck is trying to get classes specifically related to the shell / repl: java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x00072477d530 (a java.io.BufferedInputStream) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:689) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324) - locked 0x000724772bf8 (a sun.net.www.protocol.http.HttpURLConnection) at java.net.URL.openStream(URL.java:1037) at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:86) at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:63) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ... thats because the repl needs to package up the code for every single line, and it serves those compiled classes to each executor over http. This particular executor seems to be stuck pulling one of those lines compiled in the repl. (This is all assuming that the thread dump is the same over the entire 30 minutes that spark seems to be stuck.) Yes, the classes should be loaded for the first partition that is processed. (there certainly could be cases where different classes are needed for each partition, but it doesn't sound like you are doing anything that would trigger this.) But to be clear, in repl mode, there will be additional classes to be sent with every single job. Hope that helps a little more ... maybe there was some issue w/ 1.2.2, though I didn't see anything with a quick search, hopefully you'll have more luck w/ 1.3.1 On Tue, Aug 18, 2015 at 2:23 PM, java8964 java8...@hotmail.com wrote: Hi, Imran: Thanks for your reply. I am not sure what do you mean repl. Can you be more detail about that? This is only happened when the Spark 1.2.2 try to scan big data set, and cannot reproduce if it scans smaller dataset. FYI, I have to build and deploy Spark 1.3.1 on our production cluster. Right now, I cannot reproduce this hang problem on the same cluster for the same big dataset. On this point, we will continue trying Spark 1.3.1, hope we will have more positive experience with it. But just for wondering, what class Spark needs to be loaded at this time? From my understanding, the executor already scan the first block data from HDFS, and hanging while starting the 2nd block. All the class should be already loaded in JVM in this case. Thanks Yong -- From: iras...@cloudera.com Date: Tue, 18 Aug 2015 12:17:56 -0500 Subject: Re: Spark Job Hangs on our production cluster To: java8...@hotmail.com CC: user@spark.apache.org just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not something which should keep things blocked for 30 min. It suggest something wrong w/ the jvm, or classpath configuration, or a combination. Looks like you are trying to run in the repl, and for whatever reason the http server for the repl to serve classes is not responsive. I'd try running outside of the repl and see if that works. sorry not a full diagnosis but maybe this'll help a bit. On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.sh export JAVA_HOME=/opt/java export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/ export
Re: Spark runs into an Infinite loop even if the tasks are completed successfully
oh I see, you are defining your own RDD Partition types, and you had a bug where partition.index did not line up with the partitions slot in rdd.getPartitions. Is that correct? On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I figured that out, And these are my findings: - It just enters in an infinite loop when there's a duplicate partition id. - It enters in an infinite loop when the partition id starts from 1 rather than 0 Something like this piece of code can reproduce it: (in getPartitions()) val total_partitions = 4 val partitionsArray: Array[Partition] = Array.ofDim[Partition](total_partitions) var i = 0 for(outer - 0 to 1){ for(partition - 1 to total_partitions){ partitionsArray(i) = new DeadLockPartitions(partition) i = i + 1 } } partitionsArray Thanks Best Regards On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid iras...@cloudera.com wrote: yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA https://issues.apache.org/jira/browse/SPARK-583 (which is very very old) Thanks Best Regards
Re: Spark runs into an Infinite loop even if the tasks are completed successfully
yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA https://issues.apache.org/jira/browse/SPARK-583 (which is very very old) Thanks Best Regards
Re: takeSample() results in two stages
It launches two jobs because it doesn't know ahead of time how big your RDD is, so it doesn't know what the sampling rate should be. After counting all the records, it can determine what the sampling rate should be -- then it does another pass through the data, sampling by the rate its just determined. Note that this suggests: (a) if you know the size of your RDD ahead of time, you could eliminate that first pass and (b) since you end up computing the input RDD twice, it may make sense to cache it. On Thu, Jun 11, 2015 at 11:43 AM, barmaley o...@solver.com wrote: I've observed interesting behavior in Spark 1.3.1, the reason for which is not clear. Doing something as simple as sc.textFile(...).takeSample(...) always results in two stages:Spark's takeSample() results in two stages http://apache-spark-user-list.1001560.n3.nabble.com/file/n23280/Capture.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/takeSample-results-in-two-stages-tp23280.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap output on disk / flatMap memory overhead
I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there were big improvements to shuffle with sort based shuffle as the default. On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com wrote: Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about Serialization in Storage Level
Hi Zhipeng, yes, your understanding is correct. the SER portion just refers to how its stored in memory. On disk, the data always has to be serialized. On Fri, May 22, 2015 at 10:40 PM, Jiang, Zhipeng zhipeng.ji...@intel.com wrote: Hi Todd, Howard, Thanks for your reply, I might not present my question clearly. What I mean is, if I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager will cache the rdd to MemoryStore. RDD will be migrated to DiskStore when it cannot fit in memory. I think this migration does require data serialization and compression (if spark.rdd.compress is set to be true). So the data in Disk is serialized, even if I didn’t choose a serialized storage level, am I right? Thanks, Zhipeng *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Thursday, May 21, 2015 8:49 PM *To:* Jiang, Zhipeng *Cc:* user@spark.apache.org *Subject:* Re: Question about Serialization in Storage Level From the docs, https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence : *Storage Level* *Meaning* MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. MEMORY_AND_DISK Store RDD as *deserialized* Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. MEMORY_ONLY_SER Store RDD as *serialized* Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer https://spark.apache.org/docs/latest/tuning.html, but more CPU-intensive to read. MEMORY_AND_DISK_SER Similar to *MEMORY_ONLY_SER*, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng zhipeng.ji...@intel.com wrote: Hi there, This question may seem to be kind of naïve, but what’s the difference between *MEMORY_AND_DISK* and *MEMORY_AND_DISK_SER*? If I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager won’t serialize the *rdd*? Thanks, Zhipeng
Re: Spark and logging
only an answer to one of your questions: What about log statements in the partition processing functions? Will their log statements get logged into a file residing on a given 'slave' machine, or will Spark capture this log output and divert it into the log file of the driver's machine? they get logged to files on the remote nodes. You can view the logs for each executor through the UI. If you are using spark on yarn, you can grab all the logs with yarn logs.
Re: FetchFailedException and MetadataFetchFailedException
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:933) at org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:165) at org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:162) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.org $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:162) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:156) at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1208) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:88) at org.apache.spark.executor.Executor.stop(Executor.scala:146) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:105) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:38) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) On Tue, May 19, 2015 at 3:38 AM, Imran Rashid iras...@cloudera.com wrote: Hi, can you take a look at the logs and see what the first error you are getting is? Its possible that the file doesn't exist when that error is produced, but it shows up later -- I've seen similar things happen, but only after there have already been some errors. But, if you see that in the very first error, then Im not sure what the cause is. Would be helpful for you to send the logs. Imran On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote: I am trying to sort a collection of key,value pairs (between several hundred million to a few billion) and have recently been getting lots of FetchFailedException errors that seem to originate when one of the executors doesn't seem to find a temporary shuffle file on disk. E.g.: org.apache.spark.shuffle.FetchFailedException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index (No such file or directory) This file actually exists: ls -l /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index This error repeats on several executors and is followed by a number of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 This results on most tasks being lost and executors dying. There is plenty of space on all of the appropriate filesystems, so none of the executors are running out of disk space. Any idea what might be causing this? I am running this via YARN on approximately 100 nodes with 2 cores per node. Any thoughts on what might be causing these errors? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EOFException using KryoSerializer
Hi Jim, this is definitley strange. It sure sounds like a bug, but it also is a very commonly used code path, so it at the very least you must be hitting a corner case. Could you share a little more info with us? What version of spark are you using? How big is the object you are trying to broadcast? Can you share more of the logs from before the exception? It is not too surprising this shows up in mesos but not in local mode. Local mode never exercises the part of the code that needs to deserialize the blocks of a broadcast variables (though it actually does serialize the data into blocks). So I doubt its mesos specific, more likely it would happen in any cluster mode -- yarn, standalone, or even local-cluster (a pseudo-cluster just for testing). Imran On Tue, May 19, 2015 at 3:56 PM, Jim Carroll jimfcarr...@gmail.com wrote: I'm seeing the following exception ONLY when I run on a Mesos cluster. If I run the exact same code with master set to local[N] I have no problem: 2015-05-19 16:45:43,484 [task-result-getter-0] WARN TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) KryoSerializer explicitly throws an EOFException. The comment says: // DeserializationStream uses the EOF exception to indicate stopping condition. Apparently this isn't what TorrentBroadcast expects. Any suggestions? Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
hmm, I guess it depends on the way you look at it. In a way, I'm saying that spark does *not* have any built in auto-re-broadcast if you try to mutate a broadcast variable. Instead, you should create something new, and just broadcast it separately. Then just have all the code you have operating on your RDDs look at the new broadcast variable. But I guess there is another way to look at it -- you are creating new broadcast variables each time, but they all point to the same underlying mutable data structure. So in a way, you are rebroadcasting the same underlying data structure. Let me expand my example from earlier a little bit more: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } // this is a val, because the data structure itself is mutable val myMutableDataStructue = ... // this is a var, because you will create new broadcasts var myBroadcast = sc.broadcast(myMutableDataStructure) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) // update your mutable data structure in place myMutableDataStructure.update(...) // ... but that doesn't effect the broadcast variables living out on the cluster, so we need to // create a new one // this line is not required -- the broadcast var will automatically get unpersisted when a gc // cleans up the old broadcast on the driver, but I'm including this here for completeness, // in case you want to more proactively clean up old blocks if you are low on space myBroadcast.unpersist() // now we create a new broadcast which has the updated data in our mutable data structure myBroadcast = sc.broadcast(myMutableDataStructure) } hope this clarifies things! Imran On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote: Hi Imran, If I understood you correctly, you are suggesting to simply call broadcast again from the driver program. This is exactly what I am hoping will work as I have the Broadcast data wrapped up and I am indeed (re)broadcasting the wrapper over again when the underlying data changes. However, documentation seems to suggest that one cannot re-broadcast. Is my understanding accurate? Thanks NB On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote: Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe
Re: Error communicating with MapOutputTracker
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber thomas.ger...@radius.com wrote: Now, we noticed that we get java heap OOM exceptions on the output tracker when we have too many tasks. I wonder: 1. where does the map output tracker live? The driver? The master (when those are not the same)? 2. how can we increase the heap for it? Especially when using spark-submit? It does not live on the master -- that is only in a standalone cluster, and it does very little work. (Though there are *Master and *Worker variants of the class, its really running on the driver and the executors.) If you are getting OOMs in the MapOutputTrackerMaster (which lives on the driver), then you can increase the memory for the driver via the normal args for controlling driver memory, with --driver-memory 10G or whatever. Just to be clear, if you hit an OOM from somewhere in the MapOutputTracker code, it just means that code is what pushed things over the top. Of course you could have 99% of your memory used by something else, perhaps your own data structures, which perhaps could be trimmed down. You could get a heap dump on the driver to see where the memory is really getting used. Do you mind sharing the details of how you hit these OOMs? How much memory, how many partitions on each side of the shuffle? Sort based shuffle I assume? thanks, Imran
Re: applications are still in progress?
Most likely, you never call sc.stop(). Note that in 1.4, this will happen for you automatically in a shutdown hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090 On Wed, May 13, 2015 at 8:04 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I have some applications finished(but actually failed before), that in WebUI show Application myApp is still in progress. and, in the eventlog folder, there are several log files like this: app-20150512***.inprogress So, I am wondering what the “inprogress” means… Thanks! :) Best, Yifan LI
Re: parallelism on binary file
You can use sc.hadoopFile (or any of the variants) to do what you want. They even let you reuse your existing HadoopInputFormats. You should be able to mimic your old use with MR just fine. sc.textFile is just a convenience method which sits on top. imran On Fri, May 8, 2015 at 12:03 PM, tog guillaume.all...@gmail.com wrote: Hi I havé an application that currently run using MR. It currently starts extracting information from a proprietary binary file that is copied to HDFS. The application starts by creating business objects from information extracted from the binary files. Later those objects are used for further processing using again MR jobs. I am planning to move towards Spark and I clearly see that I could use JavaRDDbusinessObjects for parallel processing. however it is not yet obvious what could be the process to generate this RDD from my binary file in parallel. Today I use parallelism based on the split assign to each of the map elements of a job. Can I mimick such a thing using spark. All example I have seen so far are using text files for which I guess the partitions are based on a given number of contiguous lines. Any help or pointer would be appreciated Cheers Guillaume -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
Re: spark log field clarification
depends what you mean by output data. Do you mean: * the data that is sent back to the driver? that is result size * the shuffle output? that is in Shuffle Write Metrics * the data written to a hadoop output format? that is in Output Metrics On Thu, May 14, 2015 at 2:22 PM, yanwei echo@gmail.com wrote: I am trying to extract the *output data size* information for *each task*. What *field(s)* should I look for, given the json-format log? Also, what does Result Size stand for? Thanks a lot in advance! -Yanwei -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LogisticRegressionWithLBFGS with large feature set
I'm not super familiar with this part of the code, but from taking a quick look: a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles per feature (mean, max, min, etc. etc.) b) The limit is on the result size from *all* tasks, not from one task. You start with 3072 tasks c) tree aggregate should first merge things down to about 8 partitions before bringing results back to the driver, which is how you end up with 54 tasks at your failure. this means you should have about 30 MB / per task per meaure * 54 tasks * 7 measures, which comes to about 11GB, or in the ballpark of what you found. In principle, you could get this working by adding more levels to the treeAggregate (the depth parameter), but looks like that isn't exposed. You could also try coalescing your data down to a smaller set of partitions first, but that comes with other downsides. Perhaps an MLLib expert could chime in on an alternate approach. My feeling (from a very quick look) is that there is room for some optimization in the internals Imran On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala
Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted
Looks like this exception is after many more failures have occurred. It is already on attempt 6 for stage 7 -- I'd try to find out why attempt 0 failed. This particular exception is probably a result of corruption that can happen when stages are retried, that I'm working on addressing in https://issues.apache.org/jira/browse/SPARK-7308. But your real problem is figuring out why the stage failed in the first place. On Wed, May 13, 2015 at 6:01 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I was running our graphx application(worked finely on Spark 1.2.0) but failed on Spark 1.3.1 with below exception. Anyone has idea on this issue? I guess it was caused by using LZ4 codec? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116) at com.esotericsoftware.kryo.io.Input.fill(Input.java:140) ... 35 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at
Re: Spark on Yarn : Map outputs lifetime ?
Neither of those two. Instead, the shuffle data is cleaned up when the stage they are from get GC'ed by the jvm. that is, when you are no longer holding any references to anything which points to the old stages, and there is an appropriate gc event. The data is not cleaned up right after the stage completes, because it might get used again by another later (eg., if the stage is retried). On Tue, May 12, 2015 at 6:50 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Hi, In spark on yarn and when running spark_shuffle as auxiliary service on node manager, does map spills of a stage gets cleaned up once the next stage completes OR is it preserved till the app completes(ie waits for all the stages to complete) ? -- Thanks, Ashwin
Re: Broadcast variables can be rebroadcast?
Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: FetchFailedException and MetadataFetchFailedException
Hi, can you take a look at the logs and see what the first error you are getting is? Its possible that the file doesn't exist when that error is produced, but it shows up later -- I've seen similar things happen, but only after there have already been some errors. But, if you see that in the very first error, then Im not sure what the cause is. Would be helpful for you to send the logs. Imran On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote: I am trying to sort a collection of key,value pairs (between several hundred million to a few billion) and have recently been getting lots of FetchFailedException errors that seem to originate when one of the executors doesn't seem to find a temporary shuffle file on disk. E.g.: org.apache.spark.shuffle.FetchFailedException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index (No such file or directory) This file actually exists: ls -l /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index This error repeats on several executors and is followed by a number of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 This results on most tasks being lost and executors dying. There is plenty of space on all of the appropriate filesystems, so none of the executors are running out of disk space. Any idea what might be causing this? I am running this via YARN on approximately 100 nodes with 2 cores per node. Any thoughts on what might be causing these errors? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:
oh yeah, I think I remember we discussed this a while back ... sorry I forgot the details. If you know you don't have a graph, did you try setting spark.kryo.referenceTracking to false? I'm also confused on how you could hit this with a few million objects. Are you serializing them one at a time, or is there one big container which holds them all? Was there ever any follow up from kryo? On Wed, May 6, 2015 at 2:29 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi Imran, I had tried setting a really huge kryo buffer size (GB), but it didn’t make any difference. In my data sets, objects are no more than 1KB each, and don’t form a graph, so I don’t think the buffer size should need to be larger than a few MB, except perhaps for reasons of efficiency? The exception usually occurs in “com.esotericsoftware.kryo.util.IdentityObjectIntMap” when it is resizing (or a similar operation), implying there are too many object references, though it’s hard to see how I could get to 2b references from a few million objects... T On 6 May 2015 at 00:58, Imran Rashid iras...@cloudera.com wrote: Are you setting a really large max buffer size for kryo? Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ? If not, we should open up another issue to get a better warning in these cases. On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote: Thanks Tristan for sharing this. Actually this happens when I am reading a csv file of 3.5 GB. best, /Shahab On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi Shahab, I’ve seen exceptions very similar to this (it also manifests as negative array size exception), and I believe it’s a really bug in Kryo. See this thread: http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E Manifests in all of the following situations when working with an object graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop save APIs. Tristan On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote: Can you post your code, otherwise there's not much we can do. Regards, Olivier. Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit : Hi, I am using sprak-1.2.0 and I used Kryo serialization but I get the following excepton. java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1 I do apprecciate if anyone could tell me how I can resolve this? best, /Shahab
Re: How to skip corrupted avro files
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 and the discussion around the PRs. This is probably more complicated than what you are looking for, but you could copy the code for HadoopReliableRDD in the PR into your own code and use it, without having to wait for the issue to get resolved. On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man mat...@yahoo.com.invalid wrote: Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The avro files are from a third party and a few of them are corrupted. val path = {myDirecotry of avro files} val sparkConf = new SparkConf().setAppName(avroDemo).setMaster(local) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val data = sqlContext.avroFile(path); data.select(.) When I run the above code, I get the following exception. org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) ~[classes/:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) ~[spark-core_2.10-1.3.1.jar:1.3.1] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.1.jar:1.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] Caused by: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) ~[classes/:1.7.7] at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) ~[classes/:1.7.7] ... 25 common frames omitted Is there an easy way to skip a corrupted avro file without reading the files one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is to have my own version of org.apache.avro.file.DataFileStream with method hasNext returns false ( to signal the end file), when java.io.IOException: Invalid sync! is thrown. Please see line 210 in https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Thanks in advance for any assistance ! Shing
Re: How to deal with code that runs before foreach block in Apache Spark?
Gerard is totally correct -- to expand a little more, I think what you want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of solrInputDocumentJavaRDD.foreach: solrInputDocumentJavaRDD.foreachPartition( new VoidFunctionIteratorSolrInputDocument() { @Override public void call(IteratorSolrInputDocument docItr) { ListSolrInputDocument docs = new ArrayListSolrInputDocument(); for(SolrInputDocument solrInputDocument: docItr) { // Add the solrInputDocument to the list of SolrInputDocuments docs.add(solrInputDocument); } // push things to solr **from the executor, for this partition** // so for this make sense, you need to be sure solr can handle a bunch // of executors pushing into it simultaneously. addThingsToSolr(docs); } }); On Mon, May 4, 2015 at 8:44 AM, Gerard Maas gerard.m...@gmail.com wrote: I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to: SolrIndexerDriver.solrInputDocumentList.add(elem) is happening on different singleton instances of the SolrIndexerDriver on different JVMs while SolrIndexerDriver.solrServer.commit is happening on the driver. In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening. -kr, Gerard On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr. This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like send to Solr part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect(); after foreach, but it seems like it does not have any effect. // For each RDD solrInputDocumentJavaDStream.foreachRDD( new FunctionJavaRDDSolrInputDocument, Void() { @Override public Void call(JavaRDDSolrInputDocument solrInputDocumentJavaRDD) throws Exception { // For each item in a single RDD solrInputDocumentJavaRDD.foreach( new VoidFunctionSolrInputDocument() { @Override public void call(SolrInputDocument solrInputDocument) { // Add the solrInputDocument to the list of SolrInputDocuments SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); } }); // Try to force execution solrInputDocumentJavaRDD.collect(); // After having finished adding every SolrInputDocument to the list // add it to the solrServer, and commit, waiting for the commit to be flushed try { // Seems like when run in cluster mode, the list size is zero, // therefore the following part is never executed if (SolrIndexerDriver.solrInputDocumentList != null SolrIndexerDriver.solrInputDocumentList.size() 0) { SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); SolrIndexerDriver.solrServer.commit(true, true); SolrIndexerDriver.solrInputDocumentList.clear(); } } catch (SolrServerException | IOException e) { e.printStackTrace(); } return null; } } ); What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList (and works also in cluster mode)? -- Emre Sevinç
Re: Spark job concurrency problem
can you give your entire spark submit command? Are you missing --executor-cores num_cpu? Also, if you intend to use all 6 nodes, you also need --num-executors 6 On Mon, May 4, 2015 at 2:07 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I have two small RDD, each has about 600 records. In my code, I did val rdd1 = sc...cache() val rdd2 = sc...cache() val result = rdd1.cartesian(rdd2).*repartition*(num_cpu).map {case (a,b) = some_expensive_job(a,b) } I ran my job in YARN cluster with --master yarn-cluster, I have 6 executor, and each has a large memory volume. However, I noticed my job is very slow. I went to the RM page, and found there are two containers, one is the driver, one is the worker. I guess this is correct? I went to the worker's log, and monitor the log detail. My app print some information, so I can use them to estimate the progress of the map operation. Looking at the log, it feels like the jobs are done one by one sequentially, rather than #cpu batch at a time. I checked the worker node, and their CPU are all busy. [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:
Are you setting a really large max buffer size for kryo? Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ? If not, we should open up another issue to get a better warning in these cases. On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote: Thanks Tristan for sharing this. Actually this happens when I am reading a csv file of 3.5 GB. best, /Shahab On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org wrote: Hi Shahab, I’ve seen exceptions very similar to this (it also manifests as negative array size exception), and I believe it’s a really bug in Kryo. See this thread: http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E Manifests in all of the following situations when working with an object graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop save APIs. Tristan On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote: Can you post your code, otherwise there's not much we can do. Regards, Olivier. Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit : Hi, I am using sprak-1.2.0 and I used Kryo serialization but I get the following excepton. java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1 I do apprecciate if anyone could tell me how I can resolve this? best, /Shahab
Re: ReduceByKey and sorting within partitions
oh wow, that is a really interesting observation, Marco Jerry. I wonder if this is worth exposing in combineByKey()? I think Jerry's proposed workaround is all you can do for now -- use reflection to side-step the fact that the methods you need are private. On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Marco, As I know, current combineByKey() does not expose the related argument where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is package private, if you can get the ShuffledRDD through reflection or other way, the keyOrdering you set will be pushed down to shuffle. If you use a combination of transformations to do it, the result will be same but the efficiency may be different, some transformations will separate into different stages, which will introduce additional shuffle. Thanks Jerry 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com: Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark kryo serialization question
yes, you should register all three. The truth is, you only *need* to register classes that will get serialized -- either via RDD caching or in a shuffle. So if a type is only used as an intermediate inside a stage, you don't need to register it. But the overhead of registering extra classes is pretty minimal, so as long as you do this within reason, I think you're OK. Imran On Thu, Apr 30, 2015 at 12:34 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote: Hi all We know that spark support Kryo serialization, suppose there is a map function which map C to K,V(here C,K,V are instance of class C,K,V), when we register kryo serialization, should I register all of these three class? Best Wishes Triones Deng 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Re: Spark partitioning question
Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than everything before the call to partitionBy(myPartitioner) is somewhat irrelevant. The important point is the partitionsBy should put all the data in one partition, and then the operations after that do not move data between partitions. so if you're really observing data in two partitions, then it would good to know more about what version of spark you are on, your data etc. as it sounds like a bug. But, I have a feeling there is some misunderstanding about what your partitioner is doing. Eg., I think doing groupByKey followed by sortByKey doesn't make a lot of sense -- in general one sortByKey is all you need (its not exactly the same, but most probably close enough, and avoids doing another expensive shuffle). If you can share a bit more information on your partitioner, and what properties you need for your f, that might help. thanks, Imran On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com wrote: Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Extra stage that executes before triggering computation with an action
sortByKey() runs one job to sample the data, to determine what range of keys to put in each partition. There is a jira to change it to defer launching the job until the subsequent action, but it will still execute another stage: https://issues.apache.org/jira/browse/SPARK-1021 On Wed, Apr 29, 2015 at 5:57 PM, Tom Hubregtsen thubregt...@gmail.com wrote: I'm not sure, but I wonder if because you are using the Spark REPL that it may not be representing what a normal runtime execution would look like and is possibly eagerly running a partial DAG once you define an operation that would cause a shuffle. What happens if you setup your same set of commands [a-e] in a file and use the Spark REPL's `load` or `paste` command to load them all at once? From Richard I have also packaged it in a jar file (without [e], the debug string), and still see the extra stage before the other two that I would expect. Even when I remove [d], the action, I still see stage 0 being executed (and do not see stage 1 and 2). Again a shortened log of the Stage 0: INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at sortByKey, which has no missing parents INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?
Another issue is that hadooprdd (which sc.textfile uses) might split input files and even if it doesn't split, it doesn't guarantee that part files numbers go to the corresponding partition number in the rdd. Eg part-0 could go to partition 27 On Apr 24, 2015 7:41 AM, Michal Michalski michal.michal...@boxever.com wrote: Of course after you do it, you probably want to call repartition(somevalue) on your RDD to get your paralellism back. Kind regards, Michał Michalski, michal.michal...@boxever.com On 24 April 2015 at 15:28, Michal Michalski michal.michal...@boxever.com wrote: I did a quick test as I was curious about it too. I created a file with numbers from 0 to 999, in order, line by line. Then I did: scala val numbers = sc.textFile(./numbers.txt) scala val zipped = numbers.zipWithUniqueId scala zipped.foreach(i = println(i)) Expected result if the order was preserved would be something like: (0, 0), (1, 1) etc. Unfortunately, the output looks like this: (126,1) (223,2) (320,3) (1,0) (127,11) (2,10) (...) The workaround I found that works for me for my specific use case (relatively small input files) is setting explicitly the number of partitions to 1 when reading a single *text* file: scala val numbers_sp = sc.textFile(./numbers.txt, 1) Than the output is exactly as I would expect. I didn't dive into the code too much, but I took a very quick look at it and figured out - correct me if I missed something, it's Friday afternoon! ;-) - that this workaround will work fine for all the input formats inheriting from org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - see the implementation of getSplits() method there ( http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 ). The numSplits variable passed there is exactly the same value as you provide as a second argument to textFile, which is minPartitions. However, while *min* suggests that we can only define a minimal number of partitions, while we have no control over the max, from what I can see in the code, that value specifies the *exact* number of partitions per the FileInputFormat.getSplits implementation. Of course it can differ for other input formats, but in this case it should work just fine. Kind regards, Michał Michalski, michal.michal...@boxever.com On 24 April 2015 at 14:05, Spico Florin spicoflo...@gmail.com wrote: Hello! I know that HadoopRDD partitions are built based on the number of splits in HDFS. I'm wondering if these partitions preserve the initial order of data in file. As an example, if I have an HDFS (myTextFile) file that has these splits: split 0- line 1, ..., line k split 1-line k+1,..., line k+n splt 2-line k+n, line k+n+m and the code val lines=sc.textFile(hdfs://mytextFile) lines.zipWithIndex() will the order of lines preserved? (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. I found this question on stackoverflow ( http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd) whose answer intrigued me: Essentially, RDD's zipWithIndex() method seems to do this, but it won't preserve the original ordering of the data the RDD was created from Can you please confirm that is this the correct answer? Thanks. Florin
Re: When are TaskCompletionListeners called?
its the latter -- after spark gets to the end of the iterator (or if it hits an exception) so your example is good, that is exactly what it is intended for. On Fri, Apr 17, 2015 at 12:23 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm trying to figure out when TaskCompletionListeners are called -- are they called at the end of the RDD's compute() method, or after the iteration through the iterator of the compute() method is completed. To put it another way, is this OK: class DatabaseRDD[T] extends RDD[T] { def compute(...): Iterator[T] = { val session = // acquire a DB session TaskContext.get.addTaskCompletionListener { (context) = session.release() } val iterator = session.query(...) iterator } }
Re: How to persist RDD return from partitionBy() to disk?
https://issues.apache.org/jira/browse/SPARK-1061 note the proposed fix isn't to have spark automatically know about the partitioner when it reloads the data, but at least to make it *possible* for it to be done at the application level. On Fri, Apr 17, 2015 at 11:35 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a huge RDD[Document] with millions of items. I partitioned it using HashPartitioner and save as object file. But when I load the object file back into RDD, I lost the HashPartitioner. How do I preserve the partitions when loading the object file? Here is the code *val *docVectors : RDD[DocVector] = computeRdd() // expensive calculation *val *partitionedDocVectors : RDD[(String, DocVector)] = docVectors .keyBy(d = d.id).partitionBy(*new *HashPartitioner(16)) partitionedDocVectors.saveAsObjectFile( *c:/temp/partitionedDocVectors.obj*) // At this point, I check the folder *c:/temp/partitionedDocVectors.obj, it contains 16 parts: “part-0, part-1, … part-00015”* // Now laod the object file back *val *partitionedDocVectors2 : RDD[(String, DocVector)] = sc.objectFile( *c:/temp/partitionedDocVectors.obj*) // Now partitionedDocVectors2 contains 956 parts and it has no partinier *println*(*spartitions: **$*{partitionedDocVectors.partitions.size}**) // return 956 *if *(idAndDocVectors.partitioner.isEmpty) *println*(*No partitioner*) // it does print out this line So how can I preserve the partitions of partitionedDocVectors on disk so I can load it back? Ningjun
Re: Can't get SparkListener to work
when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Re: Execption while using kryo with broadcast
this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
oh interesting. The suggested workaround is to wrap the result from collectAsMap into another hashmap, you should try that: MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); MapString, String tmp = new HashMapString, String(matchData); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(tmp); Can you please clarify: * Does it work w/ java serialization in the end? Or is this kryo only? * which Spark version you are using? (one of the relevant bugs was fixed in 1.2.1 and 1.3.0) On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com wrote: This looks like known issue? check this out http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html Can you please suggest any work around I am broad casting HashMap return from RDD.collectasMap(). On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote: this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Regarding benefits of using more than one cpu for a task in spark
Hi twinkle, To be completely honest, I'm not sure, I had never heard spark.task.cpus before. But I could imagine two different use cases: a) instead of just relying on spark's creation of tasks for parallelism, a user wants to run multiple threads *within* a task. This is sort of going against the programming model of spark, but I guess this feature is meant to give you the bare minimum support you need in case you really want. Eg., maybe you have some existing library you want to use in each task which is already multi-threaded, or you pipe to some external programming. Or maybe you even do something custom yourself -- eg. you have some coordination between threads that spark doesn't give you between tasks. b) as a simple way to tune some resource management. Eg., you could initially have your cluster configured to overcount cores for hyperthreading, but then set spark.task.cpus to 2, if you don't want to count hyperthreading. Or perhaps you want to leave some cores open for all the other work going on -- GC, network IO, etc. (But then again, this is a strange setting to use for that -- you'd probably just want some fixed number of cores to count, not a multiplier.) On Tue, Apr 7, 2015 at 2:01 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark, there are two settings regarding number of cores, one is at task level :spark.task.cpus and there is another one, which drives number of cores per executors: spark.executor.cores Apart from using more than one core for a task which has to call some other external API etc, is there any other use case / benefit of assigning more than one core to a task? As per the code, I can only see this being used while scheduling etc , as such RDD partitions etc remains untouched from this setting. Does this mean that coder needs to take care of coding the application logic to take care of this setting? ( which again let me think over this setting ). Comments please. Thanks, Twinkle
Re: Registering classes with KryoSerializer
hmm, I dunno why IntelliJ is unhappy, but you can always fall back to getting a class from the String: Class.forName(scala.reflect.ClassTag$$anon$1) perhaps the class is package private or something, and the repl somehow subverts it ... On Tue, Apr 14, 2015 at 5:44 PM, Arun Lists lists.a...@gmail.com wrote: Hi Imran, Thanks for the response! However, I am still not there yet. In the Scala interpreter, I can do: scala classOf[scala.reflect.ClassTag$$anon$1] but when I try to do this in my program in IntelliJ, it indicates an error: Cannot resolve symbol ClassTag$$anon$1 Hence I am not any closer to making this work. If you have any further suggestions, they would be most welcome. arun On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid iras...@cloudera.com wrote: Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: counters in spark
Hi Robert, A lot of task metrics are already available for individual tasks. You can get these programmatically by registering a SparkListener, and you van also view them in the UI. Eg., for each task, you can see runtime, serialization time, amount of shuffle data read, etc. I'm working on also exposing the data in the UI as json. In addition, you can also use the metrics system to get a different view of the data. It has a different set of information, and also is better for a timeline view, as opposed to a task-oriented view you get through the UI. You can read about both options here: https://spark.apache.org/docs/latest/monitoring.html On Mon, Apr 13, 2015 at 12:48 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Guys, Do you have any thoughts on this ? Thanks, Robert On Sunday, April 12, 2015 5:35 PM, Grandl Robert rgra...@yahoo.com.INVALID wrote: Hi guys, I was trying to figure out some counters in Spark, related to the amount of CPU or Memory used (in some metric), used by a task/stage/job, but I could not find any. Is there any such counter available ? Thank you, Robert
Re: Registering classes with KryoSerializer
Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymous inner class. I'd suggest (a) figuring out whether you really want to be serializing this thing -- its possible you're serializing an RDD which keeps a ClassTag, but normally you wouldn't want to serialize your RDDs (b) you might want to bring this up w/ chill -- spark offloads most of the kryo setup for all the scala internals to chill, I'm surprised they don't handle this already. Looks like they still handle ClassManifests which are from pre-scala 2.10: https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189 (c) you can always register these classes yourself, despite the crazy names, though you'll just need to knock these out one-by-one: scala classOf[scala.reflect.ClassTag$$anon$1] res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1: scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]): Class[_]}] = class scala.reflect.ClassTag$$anon$1 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote: Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer
HI Shuai, I don't think this is a bug with kryo, its just a subtlety with the kryo works. I *think* that it would also work if you changed your PropertiesUtil class to either (a) remove the no-arg constructor or (b) instead of extending properties, you make it a contained member variable. I wish I had a succinct explanation, but I think it really gets into the nitty gritty details of how these serializer works (and this just a hunch of mine anyway, I'm not 100% sure). Would be great if you could confirm either way. thanks, Imran On Tue, Apr 7, 2015 at 9:29 AM, Shuai Zheng szheng.c...@gmail.com wrote: I have found the issue, but I think it is bug. If I change my class to: public class ModelSessionBuilder implements Serializable { /** * */ … private *Properties[] propertiesList*; private static final long serialVersionUID = -8139500301736028670L; } The broadcast value has no issue. But in my original form, if I broadcast it as array of my custom subclass of Properties, after broadcast, the propertiesList array will be an array of empty PropertiesUtils objects there (empty, not NULL), I am not sure why this happen (the code without any problem when run with default java serializer). So I think this is a bug, but I am not sure it is a bug of spark or a bug of Kryo. Regards, Shuai *From:* Shuai Zheng [mailto:szheng.c...@gmail.com] *Sent:* Monday, April 06, 2015 5:34 PM *To:* user@spark.apache.org *Subject:* Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer Hi All, I have tested my code without problem on EMR yarn (spark 1.3.0) with default serializer (java). But when I switch to org.apache.spark.serializer.KryoSerializer, the broadcast value doesn’t give me right result (actually return me empty custom class on inner object). Basically I broadcast a builder object, which carry an array of propertiesUtils object. The code should not have any logical issue because it works on default java serializer. But when I turn to the org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn’t initialize, propertiesList will give a right size, but then all element in the array is just a normal empty PropertiesUtils. Do I miss anything when I use this KryoSerializer? I just put the two lines, do I need to implement some special code to enable KryoSerializer, but I search all places but can’t find any places mention it. sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.registerKryoClasses(*new* Class[]{ModelSessionBuilder.*class*, Constants.*class*, PropertiesUtils.*class*, ModelSession.*class*}); public class ModelSessionBuilder implements Serializable { /** * */ … private PropertiesUtils[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } *public* *class* PropertiesUtils *extends* Properties { /** * */ *private* *static* *final* *long* *serialVersionUID* = -3684043338580885551L; *public* PropertiesUtils(Properties prop) { *super*(prop); } *public* PropertiesUtils() { // *TODO* Auto-generated constructor stub } } Regards, Shuai
Re: Array[T].distinct doesn't work inside RDD
Interesting, my gut instinct is the same as Sean's. I'd suggest debugging this in plain old scala first, without involving spark. Even just in the scala shell, create one of your Array[T], try calling .toSet and calling .distinct. If those aren't the same, then its got nothing to do with spark. If its still different even after you make hashCode() consistent w/ equals(), then you might have more luck asking on the scala-user list: https://groups.google.com/forum/#!forum/scala-user If it works fine in plain scala, but not in spark, then it would be worth bringing up here again for us to look into. On Tue, Apr 7, 2015 at 4:41 PM, Anny Chen anny9...@gmail.com wrote: Hi Sean, I didn't override hasCode. But the problem is that Array[T].toSet could work but Array[T].distinct couldn't. If it is because I didn't override hasCode, then toSet shouldn't work either right? I also tried using this Array[T].distinct outside RDD, and it is working alright also, returning me the same result as Array[T].toSet. Thanks! Anny On Tue, Apr 7, 2015 at 2:31 PM, Sean Owen so...@cloudera.com wrote: Did you override hashCode too? On Apr 7, 2015 2:39 PM, anny9699 anny9...@gmail.com wrote: Hi, I have a question about Array[T].distinct on customized class T. My data is a like RDD[(String, Array[T])] in which T is a class written by my class. There are some duplicates in each Array[T] so I want to remove them. I override the equals() method in T and use val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id, arr.distinct)} to remove duplicates inside RDD. However this doesn't work since I did some further tests by using val dataNoDuplicates = dataDuplicates.map{case(id, arr) = val uniqArr = arr.distinct if(uniqArr.length 1) println(uniqArr.head == uniqArr.last) (id, uniqArr) } And from the worker stdout I could see that it always returns TRUE results. I then tried removing duplicates by using Array[T].toSet instead of Array[T].distinct and it is working! Could anybody explain why the Array[T].toSet and Array[T].distinct behaves differently here? And Why is Array[T].distinct not working? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
Shuffle write could be a good indication of skew, but it looks like the task in question hasn't generated any shuffle write yet, because its still working on the shuffle-read side. So I wouldn't read too much into the fact that the shuffle write is 0 for a task that is still running. The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or more importantly, 55M records vs 1M records). So it might not be that the raw data volume is much higher on that task, but its getting a ton more small records, which will also generate a lot of work. It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com wrote: I'm not 100% sure of spark's implementation but in the MR frameworks, it would have a much larger shuffle write size becasue that node is dealing with a lot more data and as a result has a lot more to shuffle 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com: If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong -- Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
Re: Catching executor exception from executor in driver
(+dev) Hi Justin, short answer: no, there is no way to do that. I'm just guessing here, but I imagine this was done to eliminate serialization problems (eg., what if we got an error trying to serialize the user exception to send from the executors back to the driver?). Though, actually that isn't a great explanation either, since even when the info gets back to the driver, its broken into a few string fields (eg., we have the class name of the root exception), but eventually it just gets converted to one big string. I've cc'ed dev b/c I think this is an oversight in Spark. It makes it really hard to write an app to deal gracefully with various exceptions -- all you can do is look at the string in SparkException (which could change arbitrarily between versions, in addition to just being a pain to work with). We should probably add much more fine-grained subclasses of SparkException, at the very least distinguishing errors in user code vs. errors in spark. I could imagine there might be a few other cases we'd like to distinguish more carefully as well. Any thoughts from other devs? thanks Imran On Tue, Apr 14, 2015 at 4:46 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like to know if there is a way of catching exception throw from executor exception from the driver program. Here is an example: try { val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect } catch { case e: SparkException = { println(sERROR: $e) println(sCAUSE: ${e.getCause}) } } Output: ERROR: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal): java.lang.ArithmeticException: / by zero at $line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51) ... CAUSE: null The exception cause is a null value. Is there any way that I can catch the ArithmeticException? Thanks Justin -- View this message in context: Catching executor exception from executor in driver http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Task result in Spark Worker Node
On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 then its serialized along with some other goodies, and finally sent back to the driver here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328 though perhaps a more interesting spot is where they are handled in DagScheduler#handleTaskCompletion: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001 also, I think I know what you mean, but just to make sure: I wouldn't say the results from the worker are broadcast back to the driver. (a) in spark, broadcast tends to refer to a particular api for sharing immutable data from the driver to the workers (only one direction) and (b) it doesn't really fit a more general meaning of broadcast either, since the results are sent only to the driver, not to all nodes. On Sun, Mar 29, 2015 at 8:34 PM, raggy raghav0110...@gmail.com wrote: I am a PhD student working on a research project related to Apache Spark. I am trying to modify some of the spark source code such that instead of sending the final result RDD from the worker nodes to a master node, I want to send the final result RDDs to some different node. In order to do this, I have been trying to identify at which point the Spark worker nodes broadcast the results of a job back to the master. So far, I understand that in Spark, the master serializes the RDD and the functions to be applied on them and sends them over to the worker nodes. In the context of reduce, it serializes the RDD partition and the reduce function and sends them to the worker nodes. However, my understanding of how things happen at the worker node is very limited and I would appreciate it if someone could help me identify where the process of broadcasting the results of local worker computations back to the master node takes place. This is some of the limited knowledge that I have about the worker nodes: Each job gets divided into smaller sets of tasks called stages. Each Stage is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the task results are used as input for another stage. The result stage uses the RDD to compute the action that initiated the job. So, this result stage executes the last task for the job on the worker node. I would assume after this is done, it gets the result and broadcasts it to the driver application(the master). In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it states A task that sends back the output to the driver application.. However, I don't see when or where this happens in the source code. I would very much appreciate it if someone could help me identify where this happens in the Spark source code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Registering classes with KryoSerializer
Those funny class names come from scala's specialization -- its compiling a different version of OpenHashMap for each primitive you stick in the type parameter. Here's a super simple example: *➜ **~ * more Foo.scala class Foo[@specialized X] *➜ **~ * scalac Foo.scala *➜ **~ * ls Foo*.class Foo$mcB$sp.class Foo$mcC$sp.class Foo$mcD$sp.class Foo$mcF$sp.class Foo$mcI$sp.class Foo$mcJ$sp.class Foo$mcS$sp.class Foo$mcV$sp.class Foo$mcZ$sp.class Foo.class Sadly, I'm not sure of a foolproof way of getting all those specialized versions registered except for registering with these strange names. Here's an example of how its done by chill for Tuples (which is what spark is relying on for its own registration of tuples): https://github.com/twitter/chill/blob/6d03f6976f33f6e2e16b8e254fead1625720c281/chill-scala/src/main/scala/com/twitter/chill/TupleSerializers.scala#L861 On Mon, Mar 30, 2015 at 3:59 PM, Arun Lists lists.a...@gmail.com wrote: I am trying to register classes with KryoSerializer. I get the following error message: How do I find out what class is being referred to by: *OpenHashMap$mcI$sp ?* *com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: com.comp.common.base.OpenHashMap$mcI$sp* *Note: To register this class use: * *kryo.register(com.dtex.common.base.OpenHashMap$mcI$sp.class);* I have registered other classes with it by using: sparkConf.registerKryoClasses(Array( classOf[MyClass] )) Thanks, arun
Re: How to get rdd count() without double evaluation of the RDD?
yes, it sounds like a good use of an accumulator to me val counts = sc.accumulator(0L) rdd.map{x = counts += 1 x }.saveAsObjectFile(file2) On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Sean Yes I know that I can use persist() to persist to disk, but it is still a big extra cost of persist a huge RDD to disk. I hope that I can do one pass to get the count as well as rdd.saveAsObjectFile(file2), but I don’t know how. May be use accumulator to count the total ? Ningjun *From:* Mark Hamstra [mailto:m...@clearstorydata.com] *Sent:* Thursday, March 26, 2015 12:37 PM *To:* Sean Owen *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org *Subject:* Re: How to get rdd count() without double evaluation of the RDD? You can also always take the more extreme approach of using SparkContext#runJob (or submitJob) to write a custom Action that does what you want in one pass. Usually that's not worth the extra effort. On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen so...@cloudera.com wrote: To avoid computing twice you need to persist the RDD but that need not be in memory. You can persist to disk with persist(). On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a rdd that is expensive to compute. I want to save it as object file and also print the count. How can I avoid double computation of the RDD? val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) val count = rdd.count() // this force computation of the rdd println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again I can avoid double computation by using cache val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) rdd.cache() val count = rdd.count() println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again This only compute rdd once. However the rdd has millions of items and will cause out of memory. Question: how can I avoid double computation without using cache? Ningjun
Re: Understanding Spark Memory distribution
broadcast variables count towards spark.storage.memoryFraction, so they use the same pool of memory as cached RDDs. That being said, I'm really not sure why you are running into problems, it seems like you have plenty of memory available. Most likely its got nothing to do with broadcast variables or caching -- its just whatever logic you are applying in your transformations that are causing lots of GC to occur during the computation. Hard to say without knowing more details. You could try increasing the timeout for the failed askWithReply by increasing spark.akka.lookupTimeout (defaults to 30), but that would most likely be treating a symptom, not the root cause. On Fri, Mar 27, 2015 at 4:52 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have given 26gb of memory with all 8 cores to my executors. I can see that in the logs too: *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added: app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128 (10.x.y.z:40128) with 8 cores* I am not caching any RDD so I have set spark.storage.memoryFraction to 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB. I am now confused with these logs? *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM, BlockManagerId(4, 10.x.y.z, 58407)* I am broadcasting a large object of 3 gb and after that when I am creating an RDD, I see logs which show this 4.5 GB memory getting full and then I get OOM. How can I make block manager use more memory? Is there any other fine tuning I need to do for broadcasting large objects? And does broadcast variable use cache memory or rest of the heap? Thanks Ankur
Re: Serialization Problem in Spark Program
you also need to register *array*s of MyObject. so change: conf.registerKryoClasses(Array(classOf[MyObject])) to conf.registerKryoClasses(Array(classOf[MyObject], classOf[Array[MyObject]])) On Wed, Mar 25, 2015 at 2:44 AM, donhoff_h 165612...@qq.com wrote: Hi, experts I wrote a very simple spark program to test the KryoSerialization function. The codes are as following: object TestKryoSerialization { def main(args: Array[String]) { val conf = new SparkConf() conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrationRequired,true) //I use this statement to force checking registration. conf.registerKryoClasses(Array(classOf[MyObject])) val sc = new SparkContext(conf) val rdd = sc.textFile(hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt) val objs = rdd.map(new MyObject(_,1)).collect() for (x - objs ) { x.printMyObject } } The class MyObject is also a very simple Class, which is only used to test the serialization function: class MyObject { var myStr : String = var myInt : Int = 0 def this(inStr : String, inInt : Int) { this() this.myStr = inStr this.myInt = inInt } def printMyObject { println(MyString is : +myStr+\tMyInt is : +myInt) } } But when I ran the application, it reported the following error: java.lang.IllegalArgumentException: Class is not registered: dhao.test.Serialization.MyObject[] Note: To register this class use: kryo.register(dhao.test.Serialization.MyObject[].class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I don't understand what cause this problem. I have used the conf.registerKryoClasses to register my class. Could anyone help me ? Thanks By the way, the spark version is 1.3.0.
Re: spark disk-to-disk
I think writing to hdfs and reading it back again is totally reasonable. In fact, in my experience, writing to hdfs and reading back in actually gives you a good opportunity to handle some other issues as well: a) instead of just writing as an object file, I've found its helpful to write in a format that is a little more readable. Json if efficiency doesn't matter :) or you could use something like avro, which at least has a good set of command line tools. b) when developing, I hate it when I introduce a bug in step 12 of a long pipeline, and need to re-run the whole thing. If you save to disk, you can write a little application logic that realizes step 11 is already sitting on disk, and just restart from there. c) writing to disk is also a good opportunity to do a little crude auto-tuning of the number of partitions. You can look at the size of each partition on hdfs, and then adjust the number of partitions. And I completely agree that losing the partitioning info is a major limitation -- I submitted a PR to help deal w/ it: https://github.com/apache/spark/pull/4449 getting narrow dependencies w/ partitioners can lead to pretty big performance improvements, so I do think its important to make it easily accessible to the user. Though now I'm thinking that maybe this api is a little clunky, and this should get rolled into the other changes you are proposing to hadoop RDD friends -- but I'll go into more discussion on that thread. On Mon, Mar 23, 2015 at 12:55 PM, Koert Kuipers ko...@tresata.com wrote: there is a way to reinstate the partitioner, but that requires sc.objectFile to read exactly what i wrote, which means sc.objectFile should never split files on reading (a feature of hadoop file inputformat that gets in the way here). On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote: i just realized the major limitation is that i lose partitioning info... On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote: On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote: so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. This seems like a fine solution to me.
Re: ShuffleBlockFetcherIterator: Failed to get block(s)
I think you should see some other errors before that, from NettyBlockTransferService, with a msg like Exception while beginning fetchBlocks. There might be a bit more information there. there are an assortment of possible causes, but first lets just make sure you have all the details from the original cause. On Fri, Mar 20, 2015 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: My job crashes with a bunch of these messages in the YARN logs. What are the appropriate steps in troubleshooting? 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 10 outstanding blocks (after 3 retries) 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from host:port
Re: Error communicating with MapOutputTracker
Hi Thomas, sorry for such a late reply. I don't have any super-useful advice, but this seems like something that is important to follow up on. to answer your immediate question, No, there should not be any hard limit to the number of tasks that MapOutputTracker can handle. Though of course as things get bigger, the overheads increase which is why you might hit timeouts. Two other minor suggestions: (1) increase spark.akka.askTimeout -- thats the timeout you are running into, it defaults to 30 seconds (2) as you've noted, you've needed to play w/ other timeouts b/c of long GC pauses -- its possible some GC tuning might help, though its a bit of a black art so its hard to say what you can try. You cold always try Concurrent Mark Swee to avoid the long pauses, but of course that will probably hurt overall performance. can you share any more details of what you are trying to do? Since you're fetching shuffle blocks in a shuffle map task, I guess you've got two shuffles back-to-back, eg. someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}. Do you expect to be doing a lot of GC in between the two shuffles?? -eg., in the little example I have, if there were lots of objects being created in the map filter steps that will make it out of the eden space. One possible solution to this would be to force the first shuffle to complete, before running any of the subsequent transformations, eg. by forcing materialization to the cache first val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK) intermediateRDD.count() // force the shuffle to complete, without trying to do our complicated downstream logic at the same time val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...} Also, can you share your data size? Do you expect the shuffle to be skewed, or do you think it will be well-balanced? Not that I'll have any suggestions for you based on the answer, but it may help us reproduce it and try to fix whatever the root cause is. thanks, Imran On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com wrote: I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO
Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded
I think you are running into a combo of https://issues.apache.org/jira/browse/SPARK-5928 and https://issues.apache.org/jira/browse/SPARK-5945 The standard solution is to just increase the number of partitions you are creating. textFile(), reduceByKey(), and sortByKey() all take an optional second argument, where you can specify the number of partitions you use. It looks its using spark.default.parallelism right now, which will be the number of cores in your cluster usually (not sure what that is in your case). The exception you gave shows your about 6x over the limit in at least this one case, so I'd start by with at least 10x the number of partitions you have now, and increase until it works (or you run into some other problem from too many partitions ...) I'd also strongly suggest doing the filter before you do the sortByKey -- no reason to force all that data if you're going to through a lot of it away. Its not completely clear where you are hitting the error now -- that alone. might even solve your problem. hope this helps, Imran On Thu, Mar 19, 2015 at 5:28 PM, roni roni.epi...@gmail.com wrote: I get 2 types of error - -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded Spar keeps re-trying to submit the code and keeps getting this error. My file on which I am finding the sliding window strings is 500 MB and I am doing it with length = 150. It woks fine till length is 100. This is my code - val hgfasta = sc.textFile(args(0)) // read the fasta file val kCount = hgfasta.flatMap(r = { r.sliding(args(2).toInt) }) val kmerCount = kCount.map(x = (x, 1)).reduceByKey(_ + _).map { case (x, y) = (y, x) }.sortByKey(false).map { case (i, j) = (j, i) } val filtered = kmerCount.filter(kv = kv._2 5) filtered.map(kv = kv._1 + , + kv._2.toLong).saveAsTextFile(args(1)) } It gets stuck and flat map and save as Text file Throws -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
Re: Why I didn't see the benefits of using KryoSerializer
Hi Yong, yes I think your analysis is correct. I'd imagine almost all serializers out there will just convert a string to its utf-8 representation. You might be interested in adding compression on top of a serializer, which would probably bring the string size down in almost all cases, but then you also need to take the time for compression. Kryo is generally more efficient than the java serializer on complicated object types. I guess I'm still a little surprised that kryo is slower than java serialization for you. You might try setting spark.kryo.referenceTracking to false if you are just serializing objects with no circular references. I think that will improve the performance a little, though I dunno how much. It might be worth running your experiments again with slightly more complicated objects and see what you observe. Imran On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote: I read the Spark code a little bit, trying to understand my own question. It looks like the different is really between org.apache.spark.serializer.JavaSerializer and org.apache.spark.serializer.KryoSerializer, both having the method named writeObject. In my test case, for each line of my text file, it is about 140 bytes of String. When either JavaSerializer.writeObject(140 bytes of String) or KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the underline OutputStream space usage. Does this mean that KryoSerializer really doesn't give us any benefit for String type? I understand that for primitives types, it shouldn't have any benefits, but how about String type? When we talk about lower the memory using KryoSerializer in spark, under what case it can bring significant benefits? It is my first experience with the KryoSerializer, so maybe I am total wrong about its usage. Thanks Yong -- From: java8...@hotmail.com To: user@spark.apache.org Subject: Why I didn't see the benefits of using KryoSerializer Date: Tue, 17 Mar 2015 12:01:35 -0400 Hi, I am new to Spark. I tried to understand the memory benefits of using KryoSerializer. I have this one box standalone test environment, which is 24 cores with 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0. I put one text file in the hdfs about 1.2G. Here is the settings in the spark-env.sh export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 export SPARK_WORKER_MEMORY=32g export SPARK_DRIVER_MEMORY=2g export SPARK_EXECUTOR_MEMORY=4g First test case: val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) log.count() log.count() The data is about 3M rows. For the first test case, from the storage in the web UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 cached partitions. This matched with what I thought, and first count finished about 17s, and 2nd count finished about 6s. 2nd test case after restart the spark-shell: val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) log.count() log.count() Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached is 100% with 10 cached partitions. It looks like caching the default java serialized format reduce the memory usage, but coming with a cost that first count finished around 39s and 2nd count finished around 9s. So the job runs slower, with less memory usage. So far I can understand all what happened and the tradeoff. Now the problem comes with when I tried to test with KryoSerializer SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer /opt/spark/bin/spark-shell val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) log.count() log.count() First, I saw that the new serializer setting passed in, as proven in the Spark Properties of Environment shows spark.driver.extraJavaOptions -Dspark.serializer=org.apache.spark.serializer.KryoSerializer . This is not there for first 2 test cases. But in the web UI of Storage, the Size in Memory is 1234M, with 100% Fraction Cached and 10 cached partitions. The first count took 46s and 2nd count took 23s. I don't get much less memory size as I expected, but longer run time for both counts. Anything I did wrong? Why the memory foot print of MEMORY_ONLY_SER for KryoSerializer still use the same size as default Java serializer, with worse duration? Thanks Yong
Re: Spark will process _temporary folder on S3 is very slow and always cause failure
I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
Re: Need Advice about reading lots of text files
Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking. I think this encourages Pat's argument that we might actually need better support for this in spark context itself? On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Process time series RDD after sortByKey
Hi Shuai, yup, that is exactly what I meant -- implement your own class MyGroupingRDD. This is definitely more detail than a lot of users will need to go, but its also not all that scary either. In this case, you want something that is *extremely* close to the existing CoalescedRDD, so start by looking at that code. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala The only thing which is complicated in CoalescedRDD is the PartitionCoalescer, but that is completely irrelevant for you, so you can ignore it. I started writing up a description of what to do but then I realized just writing the code would be easier :) Totally untested, but here you go: https://gist.github.com/squito/c2d1dd5413a60830d6f3 The only really interesting part here is getPartitions: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31 That's where you create partitions in your new RDD, which depend on multiple RDDs from the parent. Also note that compute() is very simple: you just concatenate together the iterators from each of the parent RDDs: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37 let me know how it goes! On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Imran, I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here? You code below: val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn. Regards, Shuai *From:* Imran Rashid [mailto:iras...@cloudera.com] *Sent:* Monday, March 16, 2015 11:22 AM *To:* Shawn Zheng; user@spark.apache.org *Subject:* Re: Process time series RDD after sortByKey Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back
Re: How to preserve/preset partition information when load time series data?
Hi Shuai, It should certainly be possible to do it that way, but I would recommend against it. If you look at HadoopRDD, its doing all sorts of little book-keeping that you would most likely want to mimic. eg., tracking the number of bytes records that are read, setting up all the hadoop configuration, splits, readers, scheduling tasks for locality, etc. Thats why I suggested that really you want to just create a small variant of HadoopRDD. hope that helps, Imran On Sat, Mar 14, 2015 at 11:10 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry for reply late. But I just think of one solution: if I load all the file name itself (not the contain of the file), so I have a RDD[key, iterable[filename]], then I run mapPartitionsToPair on it with preservesPartitioning=true Do you think it is a right solution? I am not sure whether it has potential issue if I try to fake/enforce the partition in my own way. Regards, Shuai On Wed, Mar 11, 2015 at 8:09 PM, Imran Rashid iras...@cloudera.com wrote: It should be *possible* to do what you want ... but if I understand you right, there isn't really any very easy way to do it. I think you would need to write your own subclass of RDD, which has its own logic on how the input files get put divided among partitions. You can probably subclass HadoopRDD and just modify getPartitions(). your logic could look at the day of each filename to decide which partition it goes into. You'd need to make corresponding changes for HadoopPartition the compute() method. (or if you can't subclass HadoopRDD directly you can use it for inspiration.) On Mon, Mar 9, 2015 at 11:18 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, If I have a set of time series data files, they are in parquet format and the data for each day are store in naming convention, but I will not know how many files for one day. 20150101a.parq 20150101b.parq 20150102a.parq 20150102b.parq 20150102c.parq … 201501010a.parq … Now I try to write a program to process the data. And I want to make sure each day’s data into one partition, of course I can load all into one big RDD to do partition but it will be very slow. As I already know the time series of the file name, is it possible for me to load the data into the RDD also preserve the partition? I know I can preserve the partition by each file, but is it anyway for me to load the RDD and preserve partition based on a set of files: one partition multiple files? I think it is possible because when I load a RDD from 100 files (assume cross 100 days), I will have 100 partitions (if I disable file split when load file). Then I can use a special coalesce to repartition the RDD? But I don’t know is it possible to do that in current Spark now? Regards, Shuai
Re: Process time series RDD after sortByKey
Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back to the driver, you could use RDD.toLocalIterator. That will send one partition back to the driver, let you process it on the driver, then fetch the next partition, etc. Imran
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
Giving a bit more detail on the error would make it a lot easier for others to help you out. Eg., in this case, it would have helped if included your actual compile error. In any case, I'm assuming your issue is b/c that class if private to spark. You can sneak around that by using Class.forName(stringOfClassName) instead: scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] console:8: error: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] ^ scala Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus) res1: Class[_] = class org.apache.spark.scheduler.HighlyCompressedMapStatus hope this helps, Imran On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: saveAsTextFile extremely slow near finish
is your data skewed? Could it be that there are a few keys with a huge number of records? You might consider outputting (recordA, count) (recordB, count) instead of recordA recordA recordA ... you could do this with: input = sc.textFile pairsCounts = input.map{x = (x,1)}.reduceByKey{_ + _} sorted = pairs.sortByKey sorted.saveAsTextFile On Mon, Mar 9, 2015 at 12:31 PM, mingweili0x m...@spokeo.com wrote: I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Top, takeOrdered, sortByKey
I am not entirely sure I understand your question -- are you saying: * scoring a sample of 50k events is fast * taking the top N scores of 77M events is slow, no matter what N is ? if so, this shouldn't come as a huge surprise. You can't find the top scoring elements (no matter how small N is) unless you score all 77M of them. Very naively, you would expect scoring 77M events to take ~1000 times as long as scoring 50k events, right? The fact that it doesn't take that much longer is probably b/c of the overhead of just launching the jobs. On Mon, Mar 9, 2015 at 4:21 PM, Saba Sehrish ssehr...@fnal.gov wrote: *From:* Saba Sehrish ssehr...@fnal.gov *Date:* March 9, 2015 at 4:11:07 PM CDT *To:* user-...@spark.apache.org *Subject:* *Using top, takeOrdered, sortByKey* I am using spark for a template matching problem. We have 77 million events in the template library, and we compare energy of each of the input event with the each of the template event and return a score. In the end we return best 1 matches with lowest score. A score of 0 is a perfect match. I down sampled the problem to use only 50k events. For a single event matching across all the events in the template (50k) I see 150-200ms for score calculation on 25 cores (using YARN cluster), but after that when I perform either a top or takeOrdered or even sortByKey the time reaches to 25-50s. So far I am not able to figure out why such a huge gap going from a list of scores to a list of top 1000 scores and why sorting or getting best X matches is being dominant by a large factor. One thing I have noticed is that it doesn’t matter how many elements I return the time range is the same 25-50s for 10 - 1 elements. Any suggestions? if I am not using API properly? scores is JavaPairRDDInteger, Double, and I do something like numbestmatches is 10, 100, 1 or any number. List Tuple2Integer, Double bestscores_list = scores.takeOrdered(numbestmatches, new TupleComparator()); Or List Tuple2Integer, Double bestscores_list = scores.top(numbestmatches, new TupleComparator()); Or List Tuple2Integer, Double bestscores_list = scores.sortByKey();
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Process time series RDD after sortByKey
this is a very interesting use case. First of all, its worth pointing out that if you really need to process the data sequentially, fundamentally you are limiting the parallelism you can get. Eg., if you need to process the entire data set sequentially, then you can't get any parallelism. If you can process each hour separately, but need to process data within an hour sequentially, then the max parallelism you can get for one days is 24. But lets say you're OK with that. Zhan Zhang solution is good if you just want to process the entire dataset sequentially. But what if you wanted to process each hour separately, so you at least can create 24 tasks that can be run in parallel for one day? I think you would need to create your own subclass of RDD that is similar in spirit to what CoalescedRDD does. Your RDD would have 24 partitions, and each partition would depend on some set of partitions in its parent (your sorted RDD with 1000 partitions). I don't think you could use CoalescedRDD directly b/c you want more control over the way the partitions get grouped together. this answer is very similar to my answer to your other question about controlling partitions , hope its helps! :) On Mon, Mar 9, 2015 at 5:41 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data. I need to sort the data before I start process. Assume I can sort them successfully *dayRDD.sortByKey* but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can’t just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index? Anyway to do it? Hope I describe my issue clear… J Regards, Shuai
Re: can spark take advantage of ordered data?
Hi Jonathan, you might be interested in https://issues.apache.org/jira/browse/SPARK-3655 (not yet available) and https://github.com/tresata/spark-sorted (not part of spark, but it is available right now). Hopefully thats what you are looking for. To the best of my knowledge that covers what is available now / what is being worked on. Imran On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney jcove...@gmail.com wrote: Hello all, I am wondering if spark already has support for optimizations on sorted data and/or if such support could be added (I am comfortable dropping to a lower level if necessary to implement this, but I'm not sure if it is possible at all). Context: we have a number of data sets which are essentially already sorted on a key. With our current systems, we can take advantage of this to do a lot of analysis in a very efficient fashion...merges and joins, for example, can be done very efficiently, as can folds on a secondary key and so on. I was wondering if spark would be a fit for implementing these sorts of optimizations? Obviously it is sort of a niche case, but would this be achievable? Any pointers on where I should look?
Re: Running Spark from Scala source files other than main file
did you forget to specify the main class w/ --class Main? though if that was it, you should at least see *some* error message, so I'm confused myself ... On Wed, Mar 11, 2015 at 6:53 AM, Aung Kyaw Htet akh...@gmail.com wrote: Hi Everyone, I am developing a scala app, in which the main object does not call the SparkContext, but another object defined in the same package creates it, run spark operations and closes it. The jar file is built successfully in maven, but when I called spark-submit with this jar, that spark does not seem to execute any code. So my code looks like [Main.scala] object Main(args) { def main() { /*check parameters */ Component1.start(parameters) } } [Component1.scala] object Component1{ def start{ val sc = new SparkContext(conf) /* do spark operations */ sc.close() } } The above code compiles into Main.jar but spark-submit does not execute anything and does not show me any error (not in the logs either.) spark-submit master= spark:// Main.jar I've got this all the code working before when I wrote a single scala file, but now that I am separating into multiple scala source files, something isn't running right. Any advice on this would be greatly appreciated! Regards, Aung
Re: How to preserve/preset partition information when load time series data?
It should be *possible* to do what you want ... but if I understand you right, there isn't really any very easy way to do it. I think you would need to write your own subclass of RDD, which has its own logic on how the input files get put divided among partitions. You can probably subclass HadoopRDD and just modify getPartitions(). your logic could look at the day of each filename to decide which partition it goes into. You'd need to make corresponding changes for HadoopPartition the compute() method. (or if you can't subclass HadoopRDD directly you can use it for inspiration.) On Mon, Mar 9, 2015 at 11:18 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, If I have a set of time series data files, they are in parquet format and the data for each day are store in naming convention, but I will not know how many files for one day. 20150101a.parq 20150101b.parq 20150102a.parq 20150102b.parq 20150102c.parq … 201501010a.parq … Now I try to write a program to process the data. And I want to make sure each day’s data into one partition, of course I can load all into one big RDD to do partition but it will be very slow. As I already know the time series of the file name, is it possible for me to load the data into the RDD also preserve the partition? I know I can preserve the partition by each file, but is it anyway for me to load the RDD and preserve partition based on a set of files: one partition multiple files? I think it is possible because when I load a RDD from 100 files (assume cross 100 days), I will have 100 partitions (if I disable file split when load file). Then I can use a special coalesce to repartition the RDD? But I don’t know is it possible to do that in current Spark now? Regards, Shuai
Re: scala.Double vs java.lang.Double in RDD
This doesn't involve spark at all, I think this is entirely an issue with how scala deals w/ primitives and boxing. Often it can hide the details for you, but IMO it just leads to far more confusing errors when things don't work out. The issue here is that your map has value type Any, which leads scala to leave it as a boxed java.lang.Double. scala val x = 1.5 x: Double = 1.5 scala x.getClass() res0: Class[Double] = double scala x.getClass() == classOf[java.lang.Double] res1: Boolean = false scala x.getClass() == classOf[Double] res2: Boolean = true scala val arr = Array(1.5,2.5) arr: Array[Double] = Array(1.5, 2.5) scala arr.getClass().getComponentType() == x.getClass() res5: Boolean = true scala arr.getClass().getComponentType() == classOf[java.lang.Double] res6: Boolean = false //this map has java.lang.Double scala val map: Map[String, Any] = arr.map{x = x.toString - x}.toMap map: Map[String,Any] = Map(1.5 - 1.5, 2.5 - 2.5) scala map(1.5).getClass() res15: Class[_] = class java.lang.Double scala map(1.5).getClass() == x.getClass() res10: Boolean = false scala map(1.5).getClass() == classOf[java.lang.Double] res11: Boolean = true //this one has Double scala val map2: Map[String, Double] = arr.map{x = x.toString - x}.toMap map2: Map[String,Double] = Map(1.5 - 1.5, 2.5 - 2.5) scala map2(1.5).getClass() res12: Class[Double] = double scala map2(1.5).getClass() == x.getClass() res13: Boolean = true scala map2(1.5).getClass() == classOf[java.lang.Double] res14: Boolean = false On Wed, Mar 4, 2015 at 3:17 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I have a function with signature def aggFun1(rdd: RDD[(Long, (Long, Double))]): RDD[(Long, Any)] and one with def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]): RDD[(_Key, Double)] where all Double classes involved are scala.Double classes (according to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type parameters _Key and _Index are inferred by the Scala compiler). Now I am writing a test as follows: val result: Map[Long, Any] = aggFun1(input).collect().toMap result.values.foreach(v = println(v.getClass)) result.values.foreach(_ shouldBe a[Double]) and I get the following output: class java.lang.Double class java.lang.Double [info] avg [info] - should compute the average *** FAILED *** [info] 1.75 was not an instance of double, but an instance of java.lang.Double So I am wondering about what magic is going on here. Are scala.Double values in RDDs automatically converted to java.lang.Doubles or am I just missing the implicit back-conversion etc.? Any help appreciated, Tobias
Re: Is the RDD's Partitions determined before hand ?
You can set the number of partitions dynamically -- its just a parameter to a method, so you can compute it however you want, it doesn't need to be some static constant: val dataSizeEstimate = yourMagicFunctionToEstimateDataSize() val numberOfPartitions = yourConversionFromDataSizeToNumPartitions(dataSizeEstimate) val reducedRDD = someInputRDD.reduceByKey(f, numberOfPartitions) //or whatever else that needs to know number of partitions of course this means you need to do the work of figuring out those magic functions, but its certainly possible. I agree with all of Sean's recommendations, but I guess I might put a bit more emphasis on The one exception are operations that tend to pull data into memory. For me, I've found that to be a very important exception, that can come up a lot. And though in general a lot of partitions makes sense, there have been recent questions on the user list about folks going to far, using eg. 100K partitions and then having the bookkeeping overhead dominating. But thats a pretty big number -- you should still be able to err on the side of too many partitions w/out going that far, I'd imagine. On Wed, Mar 4, 2015 at 4:17 AM, Jeff Zhang zjf...@gmail.com wrote: Hi Sean, If you know a stage needs unusually high parallelism for example you can repartition further for that stage. The problem is we may don't know whether high parallelism is needed. e.g. for the join operator, high parallelism may only be necessary for some dataset that lots of data can join together while for other dataset high parallelism may not be necessary if only a few data can join together. So my question is that unable changing parallelism at runtime dynamically may not be flexible. On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Global sequential access of elements in RDD
Why would you want to use spark to sequentially process your entire data set? The entire purpose is to let you do distributed processing -- which means letting partitions get processed simultaneously by different cores / nodes. that being said, occasionally in a bigger pipeline with a lot of distributed operations, you might need to do one segment in a completely sequential manner. You have a few options -- just be aware that with all of them, you are working *around* the idea of an RDD, so make sure you have a really good reason. 1) rdd.toLocalIterator. Still pulls all of the data to the driver, just like rdd.collect(), but its slightly more scalable since it won't store *all* of the data in memory on the driver (it does still store all of the data in one partition in memory, though.) 2) write the rdd to some external data storage (eg. hdfs), and then read the data sequentially off of hdfs on your driver. Still needs to pull all of the data to the driver, but you can get it to avoid pulling an entire partition into memory and make it streaming. 3) create a number of rdds that consist of just one partition of your original rdd, and then execute actions on them sequentially: val originalRDD = ... //this should be cached to make sure you don't recompute it (0 until originalRDD.partitions.size).foreach{partitionIdx = val prunedRdd = new PartitionPruningRDD(originalRDD, {x = x == partitionIdx}) prunedRDD.runSomeActionHere() } note that PartitionPruningRDD is a developer api, however. This will run your action on one partition at a time, and ideally the tasks will be scheduled on the same node where the partitions have been cached, so you don't need to move the data around. But again, b/c you're turning it into a sequential program, most of your cluster is sitting idle, and your not really leveraging spark ... imran On Fri, Feb 27, 2015 at 1:38 AM, Wush Wu w...@bridgewell.com wrote: Dear all, I want to implement some sequential algorithm on RDD. For example: val conf = new SparkConf() conf.setMaster(local[2]). setAppName(SequentialSuite) val sc = new SparkContext(conf) val rdd = sc. parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2). sortBy(x = x, true) rdd.foreach(println) I want to see the ordered number on my screen, but it shows unordered integers. The two partitions execute the println simultaneously. How do I make the RDD execute a function globally sequential? Best, Wush
Re: NegativeArraySizeException when doing joins on skewed data
Hi Tristan, at first I thought you were just hitting another instance of https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its entirely related to kryo. Would it be possible for you to try serializing your object using kryo, without involving spark at all? If you are unfamiliar w/ kryo, you could just try something like this, it would also be OK to try out the utils in spark to do it, something like: val outputStream = new FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards) val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf) val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream) kryoStreamSer.writeObject(yourBigObject).close() My guess is that this will fail. There is a little of spark's wrapping code involved here too, but I suspect the error is out of our control. From the error, it seems like whatever object you are trying to serialize has more than 2B references: Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap. resize(IdentityObjectIntMap.java:409) Though that is rather surprising -- it doesn't even seem possible to me with an object that is only 6 GB. There are a handful of other size restrictions and tuning parameters that come with kryo as well. It would be good for us to write up some docs on those limitations, as well as work with the kryo devs to see which ones can be removed. (Eg., another one that I just noticed from browsing the code is that even when writing to a stream, kryo has an internal buffer of limited size, which is periodically flushes. Perhaps we can get kryo to turn off that buffer, or we can at least get it to flush more often.) thanks, Imran On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org wrote: I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at
Re: GroupByKey causing problem
Hi Tushar, The most scalable option is probably for you to consider doing some approximation. Eg., sample the first to come up with the bucket boundaries. Then you can assign data points to buckets without needing to do a full groupByKey. You could even have more passes which corrects any errors in your approximation (eg., see how sortByKey() works, and how it samples the underlying RDD when constructing the RangePartitioner). Though its more passes through the data, it will probably be much faster since you avoid the expensive groupByKey() Imran On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma tushars...@gmail.com wrote: Hi, I am trying to apply binning to a large CSV dataset. Here are the steps I am taking: 1. Emit each value of CSV as (ColIndex,(RowIndex,value)) 2. Then I groupByKey (here ColumnIndex) and get all values of a particular index to one node, as I have to work on the collection of all values 3. I apply my binning algorithm which is as follows: a. Sort the values b. Iterate through values and see if it is different than the previous one if no then add it to the same bin if yes then check the size of that bin, if it is greater than a particular size (say 5% of wholedataset) then change the bin number, else keep the same bin c. repeat for each column Due to this algorithm I can't calculate it partition wise and merge for final result. But even for groupByKey I expect it should work , maybe slowly, but it should finish. I increased the partition to reduce the output of each groupByKey so that it helps in successful completion of the process. But even with that it is stuck at the same stage. The log for executor says: ExternalMapAppendOnly(splilling to disk) (Trying ...) The code works for small CSV files but can't complete for big files. val inputfile = hdfs://hm41:9000/user/file1 val table = sc.textFile(inputfile,1000) val withoutHeader: RDD[String] = dropHeader(table) val kvPairs = withoutHeader.flatMap(retAtrTuple) //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA) else y)} val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__) val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case (x,y) = x}.collect() //val isNum_Arr = isNum.sortByKey().collect() val kvidx = withoutHeader.zipWithIndex //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) } val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 } //val t2 = t.filter{case (a,b) = a._1 ==0 } val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))} //val t4 = t3.sortBy(_._2._1) val t4 = t3.groupByKey.map{case (a,b) = (a,classing_summary(b.toArray.sortBy(_._2)))} def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = { if (idx == 0) { lines.drop(1) } lines }) } def retAtrTuple(x: String) = { val newX = x.split(',') for (h - 0 until newX.length) yield (h, newX(h)) } def isNumeric(s: String): Boolean = { (allCatch opt s.toDouble).isDefined } def classing_summary(arr: Array[(Long, Double)]) = { var idx = 0L var value = 0.0 var prevValue = Double.MinValue var counter = 1 var classSize = 0.0 var size = arr.length val output = for(i - 0 until arr.length) yield { idx = arr(i)._1; value = arr(i)._2; if(value==prevValue){ classSize+=1.0/size; //println(both values same) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else if(classSize(0.05)){ classSize+=1.0/size; //println(both values not same, adding to present bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else { classSize = 1.0/size; counter +=1; //println(both values not same, adding to different bucket) //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } } output.toArray } Thanks in advance, Tushar Sharma
Re: Cartesian issue with user defined objects
any chance your input RDD is being read from hdfs, and you are running into this issue (in the docs on SparkContext#hadoopFile): * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. On Thu, Feb 26, 2015 at 10:38 AM, mrk91 marcogaid...@gmail.com wrote: Hello, I have an issue with the cartesian method. When I use it with the Java types everything is ok, but when I use it with RDD made of objects defined by me it has very strage behaviors which depends on whether the RDD is cached or not (you can see here http://stackoverflow.com/questions/28727823/creating-a-matrix-of-neighbors-with-spark-cartesian-issue what happens). Is this due to a bug in its implementation or are there any requirements for the objects to be passed to it? Thanks. Best regards. Marco -- View this message in context: Cartesian issue with user defined objects http://apache-spark-user-list.1001560.n3.nabble.com/Cartesian-issue-with-user-defined-objects-tp21826.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Iterating on RDDs
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) // or whatever persistence makes more sense for you ... while(true) { val res = grouped.flatMap(F) res.collect.foreach(func) if(criteria) break } On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I have the following use case. (1) I have an RDD of edges of a graph (say R). (2) do a groupBy on R (by say source vertex) and call a function F on each group. (3) collect the results from Fs and do some computation (4) repeat the above steps until some criteria is met In (2), the groups are always going to be the same (since R is grouped by source vertex). Question: Is R distributed every iteration (when in (2)) or is it distributed only once when it is created? A sample code snippet is below. while(true) { val res = R.groupBy[VertexId](G).flatMap(F) res.collect.foreach(func) if(criteria) break } Since the groups remain the same, what is the best way to go about implementing the above logic?
Re: Help me understand the partition, parallelism in Spark
Hi Yong, mostly correct except for: - Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. no, you will not get 1000 partitions. Spark has to decide how many partitions to use before it even knows how many unique keys there are. If you have 200 as the default parallelism (or you just explicitly make it the second parameter to reduceByKey()), then you will get 200 partitions. The 1000 unique keys will be distributed across the 200 partitions. ideally they will be distributed pretty equally, but how they get distributed depends on the partitioner (by default you will have a HashPartitioner, so it depends on the hash of your keys). Note that this is more or less the same as in Hadoop MapReduce. the amount of parallelism matters b/c there are various places in spark where there is some overhead proportional to the size of a partition. So in your example, if you have 1000 unique keys in 200 partitions, you expect about 5 unique keys per partitions -- if instead you had 10 partitions, you'd expect 100 unique keys per partitions, and thus more data and you'd be more likely to hit an OOM. But there are many other possible sources of OOM, so this is definitely not the *only* solution. Sorry I can't comment in particular about Spark SQL -- hopefully somebody more knowledgeable can comment on that. On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I come from the Hadoop MapReducer world, and try to understand some internal information of spark. From the web and this list, I keep seeing people talking about increase the parallelism if you get the OOM error. I tried to read document as much as possible to understand the RDD partition, and parallelism usage in the spark. I understand that for RDD from HDFS, by default, one partition will be one HDFS block, pretty straightforward. I saw that lots of RDD operations support 2nd parameter of parallelism. This is the part confuse me. From my understand, the parallelism is totally controlled by how many cores you give to your job. Adjust that parameter, or spark.default.parallelism shouldn't have any impact. For example, if I have a 10G data in HDFS, and assume the block size is 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 200 as the default parallelism. Here is what I assume: - We have 100 partitions, as the data comes from 100 blocks. Most likely the spark will generate 100 tasks to read and shuffle them? - The 1000 unique keys mean the 1000 reducer group, like in MR - If I set the max core to be 50, so there will be up to 50 tasks can be run concurrently. The rest tasks just have to wait for the core, if there are 50 tasks are running. - Since we are doing reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, as we have 1000 unique keys. - I don't know these 1000 partitions will be processed by how many tasks, maybe this is the parallelism parameter comes in? - No matter what parallelism this will be, there are ONLY 50 task can be run concurrently. So if we set more cores, more partitions' data will be processed in the executor (which runs more thread in this case), so more memory needs. I don't see how increasing parallelism could help the OOM in this case. - In my test case of Spark SQL, I gave 24G as the executor heap, my join between 2 big datasets keeps getting OOM. I keep increasing the spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no help. What really makes the query finish finally without OOM is after I change the --total-executor-cores from 10 to 4. So my questions are: 1) What is the parallelism really mean in the Spark? In the simple example above, for reduceByKey, what difference it is between parallelism change from 10 to 20? 2) When we talk about partition in the spark, for the data coming from HDFS, I can understand the partition clearly. For the intermediate data, the partition will be same as key, right? For group, reducing, join action, uniqueness of the keys will be partition. Is that correct? 3) Why increasing parallelism could help OOM? I don't get this part. From my limited experience, adjusting the core count really matters for memory. Thanks Yong
Re: How to tell if one RDD depends on another
no, it does not give you transitive dependencies. You'd have to walk the tree of dependencies yourself, but that should just be a few lines. On Thu, Feb 26, 2015 at 3:32 PM, Corey Nolet cjno...@gmail.com wrote: I see the rdd.dependencies() function, does that include ALL the dependencies of an RDD? Is it safe to assume I can say rdd2.dependencies.contains(rdd1)? On Thu, Feb 26, 2015 at 4:28 PM, Corey Nolet cjno...@gmail.com wrote: Let's say I'm given 2 RDDs and told to store them in a sequence file and they have the following dependency: val rdd1 = sparkContext.sequenceFile().cache() val rdd2 = rdd1.map() How would I tell programmatically without being the one who built rdd1 and rdd2 whether or not rdd2 depends on rdd1? I'm working on a concurrency model for my application and I won't necessarily know how the two rdds are constructed. What I will know is whether or not rdd1 is cached but i want to maximum concurrency and run rdd1 and rdd2 together if rdd2 does not depend on rdd1.
Re: Brodcast Variable updated from one transformation and used from another
Hi Yiannis, Broadcast variables are meant for *immutable* data. They are not meant for data structures that you intend to update. (It might *happen* to work when running local mode, though I doubt it, and it would probably be a bug if it did. It will certainly not work when running on a cluster.) This probably seems like a huge restriction, but its really fundamental to spark's execution model. B/c they are immutable, spark can make optimizations around when how the broadcast variable is shared. Furthermore, its very important for having clearly defined semantics. Eg., imagine that your broadcast variable was a hashmap. What would the eventual result be if task 1 updated key X to have value A, but task 2 updated key X to have value B? How should the updates from each task be combined together? You have a few alternatives. It really depends a lot on your use case which one is right, their are a lot of factors to consider. 1) put your updates in another RDD, collect() it, update your variable on the driver, rebroadcast it. (least scalable) 2) use an accumulator to get the updates from each stage. (maybe a bit more efficient, b) 3) use some completely different mechanism for storing the data in your broadcast var. Eg., use a distributed key-value store. Or put the data in another RDD, which you join against your data. (most scalable, but may not be applicable at all.) which one is right depends a lot on what you are trying to do. Imran On Wed, Feb 25, 2015 at 8:02 AM, Yiannis Gkoufas johngou...@gmail.com wrote: What I think is happening that the map operations are executed concurrently and the map operation in rdd2 has the initial copy of myObjectBroadcated. Is there a way to apply the transformations sequentially? First materialize rdd1 and then rdd2. Thanks a lot! On 24 February 2015 at 18:49, Yiannis Gkoufas johngou...@gmail.com wrote: Sorry for the mistake, I actually have it this way: val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObjectBroadcasted.value.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObjectBroadcasted.value.lookup(e._1); (e._1, lookedUp) }); On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- *From: *Yiannis Gkoufas [johngou...@gmail.com] *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to get yarn logs to display in the spark or yarn history-server?
the spark history server and the yarn history server are totally independent. Spark knows nothing about yarn logs, and vice versa, so unfortunately there isn't any way to get all the info in one place. On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams disc...@uw.edu wrote: Looks like in my tired state, I didn't mention spark the whole time. However, it might be implied by the application log above. Spark log aggregation appears to be working, since I can run the yarn command above. I do have yarn logging setup for the yarn history server. I was trying to use the spark history-server, but maybe I should try setting spark.yarn.historyServer.address to the yarn history-server, instead of the spark history-server? I tried this configuration when I started, but didn't have much luck. Are you getting your spark apps run in yarn client or cluster mode in your yarn history server? If so can you share any spark settings? On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi Colin, Here is how I have configured my hadoop cluster to have yarn logs available through both the yarn CLI and the _yarn_ history server (with gzip compression and 10 days retention): 1. Add the following properties in the yarn-site.xml on each node managers and on the resource manager: property nameyarn.log-aggregation-enable/name valuetrue/value /property property nameyarn.log-aggregation.retain-seconds/name value864000/value /property property nameyarn.log.server.url/name value http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs /value /property property nameyarn.nodemanager.log-aggregation.compression-type/name valuegz/value /property 2. Restart yarn and then start the yarn history server on the server defined in the yarn.log.server.url property above: /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if historyserver is not yet started /opt/hadoop/sbin/stop-yarn.sh /opt/hadoop/sbin/start-yarn.sh /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver It may be slightly different for you if the resource manager and the history server are not on the same machine. Hope it will work for you as well! Christophe. On 24/02/2015 06:31, Colin Kincaid Williams wrote: Hi, I have been trying to get my yarn logs to display in the spark history-server or yarn history-server. I can see the log information yarn logs -applicationId application_1424740955620_0009 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to us3sm2hbqa04r07-comp-prod-local Container: container_1424740955620_0009_01_02 on us3sm2hbqa07r07.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 897 Log Contents: [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K), 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs] Heap PSYoungGen total 306176K, used 111279K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 33% used [0xeaa8,0xeffebbe0,0xfab0) from space 43520K, 54% used [0xfab0,0xfc240320,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 35328K, used 34892K [0xbad8, 0xbd00, 0xbff8) object space 35328K, 98% used [0xbad8,0xbcf93088,0xbd00) Container: container_1424740955620_0009_01_03 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 896 Log Contents: [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K), 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs] Heap PSYoungGen total 306176K, used 65712K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 15% used [0xeaa8,0xed380bf8,0xfab0) from space 43520K, 54% used [0xfab0,0xfc22b4f8,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 29696K, used 29486K [0xbad8,
Re: sorting output of join operation
sortByKey() is the probably the easiest way: import org.apache.spark.SparkContext._ joinedRdd.map{case(word, (file1Counts, file2Counts)) = (file1Counts, (word, file1Counts, file2Counts))}.sortByKey() On Mon, Feb 23, 2015 at 10:41 AM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , To simplify my problem - I have 2 files from which I reading words. the o/p is like file 1 aaa 4 bbb 6 ddd 3 file 2 ddd 2 bbb 6 ttt 5 if I do file1.join(file2) I get (ddd(3,2) bbb(6,6) If I want to sort the output by the number of occurances of the word i file1 . How do I achive that. Any help would be appreciated. Thanks AJ
Re: Union and reduceByKey will trigger shuffle even same partition?
I think you're getting tripped up lazy evaluation and the way stage boundaries work (admittedly its pretty confusing in this case). It is true that up until recently, if you unioned two RDDs with the same partitioner, the result did not have the same partitioner. But that was just fixed here: https://github.com/apache/spark/pull/4629 That does mean that after you update ranks, it will no longer have a partitioner, which will effect the join on your second iteration here: val contributions = links.join(ranks).flatMap But, I think most of the shuffles you are pointing to are a different issue. I may be belaboring something you already know, but I think this is easily confusing. I think the first thing is understanding where you get stage boundaries, and how they are named. Each shuffle introduces a stage boundary. However, the stages get named by the last thing in a stage, which is not really what is always causing the shuffle. Eg., reduceByKey() causes a shuffle, but we don't see that in a stage name. Similarly, map() does not cause a shuffle, but we see a stage with that name. So, what do the stage boundaries we see actually correspond to? 1) map -- that is doing the shuffle write for the following groupByKey 2) groupByKey -- in addition to reading the shuffle output from your map, this is *also* doing the shuffle write for the next shuffle you introduce w/ partitionBy 3) union -- this is doing the shuffle reading from your partitionBy, and then all the work from there right up until the shuffle write for what is immediatley after union -- your reduceByKey. 4) lookup is an action, which is why that has another stage. a couple of things to note: (a) your join does not cause a shuffle, b/c both rdds share a partitioner (b) you have two shuffles from groupByKey followed by partitionBy -- you really probably want the 1 arg form of groupByKey(partitioner) hopefully this is helpful to understand how your stages shuffles correspond to your code. Imran On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote: This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B’s local partition is known small enough put in memory), how can I access other RDD’s local partition in the mapParitition method? Is it anyway to do this in Spark? *From:* Shao, Saisai [mailto:saisai.s...@intel.com] *Sent:* Monday, February 23, 2015 3:13 PM *To:* Shuai Zheng *Cc:* user@spark.apache.org *Subject:* RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai *From:* Shuai Zheng [mailto:szheng.c...@gmail.com szheng.c...@gmail.com] *Sent:* Monday, February 23, 2015 12:00 PM *To:* user@spark.apache.org *Subject:* Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) = links.map(dest = (dest, rank / links.size * beta)) } *ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)* } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here?
Re: what does Submitting ... missing tasks from Stage mean?
yeah, this is just the totally normal message when spark executes something. The first time something is run, all of its tasks are missing. I would not worry about cases when all tasks aren't missing if you're new to spark, its probably an advanced concept that you don't care about. (and would take me some time to explain :) On Fri, Feb 20, 2015 at 8:20 AM, shahab shahab.mok...@gmail.com wrote: Hi, Probably this is silly question, but I couldn't find any clear documentation explaining why one should submitting... missing tasks from Stage ... in the logs? Specially in my case when I do not have any failure in job execution, I wonder why this should happen? Does it have any relation to lazy evaluation? best, /Shahab
Re: Unzipping large files and 2GB partition size.
Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just because you see this: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java: -2) it doesn't *necessarily* mean that this is coming from your map. It can be pretty confusing how your operations on RDDs get turned into stages, it could be a lot more than just your map. and actually, it might not even be your map at all -- some of the other operations you invoke call map underneath the covers. So its hard to say what is going on here w/ out seeing more code. Anyway, maybe you've already considered all this (you did mention the lazy execution of the DAG), but I wanted to make sure. it might help to use rdd.setName() and also to look at rdd.toDebugString. As far as what you can do about this -- it could be as simple as moving your rdd.persist() to after you have compressed and repartitioned your data. eg., I'm blindly guessing you have something like this: val rawData = sc.hadoopFile(...) rawData.persist(DISK) rawData.count() val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) ... change it to something like: val rawData = sc.hadoopFile(...) val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) repartitionedData.persist(DISK) repartitionedData.count() ... The point is, you avoid caching any data until you have ensured that the partitions are small. You might have big partitions before that in rawData, but that is OK. Imran On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote: Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but caused by the earlier read from text file stage. Thanks for pointers to those compression formats. I'll give them a go (although it's not trivial to re-encode 200 GB of data on S3, so if I can get this working reasonably with gzip I'd like to). Any advice about whether this error can be worked round with an early partition? Cheers Joe On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote: gzip and zip are not splittable compression formats; bzip and lzo are. Ideally, use a splittable compression format. Repartitioning is not a great solution since it means a shuffle, typically. This is not necessarily related to how big your partitions are. The question is, when does this happen? what operation? On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote: On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds
Re: Unzipping large files and 2GB partition size.
oh, I think you are just choosing a number that is too small for your number of partitions. All of the data in /dir/to/gzfiles is going to be sucked into one RDD, with the data divided into partitions. So if you're parsing 200 files, each about 2 GB, and then repartitioning down to 100 partitions, you would expect 4 GB per partition. Though you're filtering the data down some, there may also be some bloat from from your parsed objects. Also if you're not using kryo for serialization, I'd strongly recommend that over the default serialization, and try to register all your classes. I think you can get some information about how much data is in your RDDs from the UI -- but it might depend on what version you are running of spark, plus I think the info isn't saved on failed stages, so you might just need to monitor it in the UI as its happening (I am not 100% sure about that ...) So I'd suggest (a) using a lot more partitions (maybe 1k, given your data size) (b) turn on kryo if you haven't already. On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass jw...@crossref.org wrote: Thanks for your detailed reply Imran. I'm writing this in Clojure (using Flambo which uses the Java API) but I don't think that's relevant. So here's the pseudocode (sorry I've not written Scala for a long time): val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files. val parsedFiles = rawData.map(parseFunction) // can return nil on failure val filtered = parsedFiles.filter(notNil) val partitioned = filtered.repartition(100) // guessed number val persisted = partitioned.persist(StorageLevels.DISK_ONLY) val resultA = stuffA(persisted) val resultB = stuffB(persisted) val resultC = stuffC(persisted) So, I think I'm already doing what you suggested. I would have assumed that partition size would be («size of expanded file» / «number of partitions»). In this case, 100 (which I picked out of the air). I wonder whether the «size of expanded file» is actually the size of all concatenated input files (probably about 800 GB)? In that case should I multiply it by the number of files? Or perhaps I'm barking up completely the wrong tree. Joe On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote: Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just because you see this: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) it doesn't *necessarily* mean that this is coming from your map. It can be pretty confusing how your operations on RDDs get turned into stages, it could be a lot more than just your map. and actually, it might not even be your map at all -- some of the other operations you invoke call map underneath the covers. So its hard to say what is going on here w/ out seeing more code. Anyway, maybe you've already considered all this (you did mention the lazy execution of the DAG), but I wanted to make sure. it might help to use rdd.setName() and also to look at rdd.toDebugString. As far as what you can do about this -- it could be as simple as moving your rdd.persist() to after you have compressed and repartitioned your data. eg., I'm blindly guessing you have something like this: val rawData = sc.hadoopFile(...) rawData.persist(DISK) rawData.count() val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) ... change it to something like: val rawData = sc.hadoopFile(...) val compressedData = rawData.map{...} val repartitionedData = compressedData.repartition(N) repartitionedData.persist(DISK) repartitionedData.count() ... The point is, you avoid caching any data until you have ensured that the partitions are small. You might have big partitions before that in rawData, but that is OK. Imran On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote: Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair
Re: Some tasks taking too much time to complete in a stage
almost all your data is going to one task. You can see that the shuffle read for task 0 is 153.3 KB, and for most other tasks its just 26B (which is probably just some header saying there are no actual records). You need to ensure your data is more evenly distributed before this step. On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am running Spark 1.2.1 for compute intensive jobs comprising of multiple tasks. I have observed that most tasks complete very quickly, but there are always one or two tasks that take a lot of time to complete thereby increasing the overall stage time. What could be the reason for this? Following are the statistics for one such stage. As you can see, the task with index 0 takes 1.1 minutes whereas others completed much more quickly. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed TasksSucceeded Tasks Input Output Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 slave1:5631146 s13 0 13 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 1 slave2:426482.1 min 13 0 13 0.0 B 0.0 B 384.3 KB0.0 B 0.0 B 0.0 B 2 slave3:4432223 s12 0 12 0.0 B 0.0 B 136.4 KB0.0 B 0.0 B 0.0 B 3 slave4:3798744 s12 0 12 0.0 B 0.0 B 213.9 KB0.0 B 0.0 B 0.0 B Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time DurationGC Time Shuffle ReadErrors 0 213 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 1.1 min 1 s 153.3 KB 5 218 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 1 214 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 2 s 0.9 s 13.8 KB 4 217 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 26 ms 26.0 B 3 216 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 2 215 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 27 ms 26.0 B 7 220 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 11 ms 0.0 B 10 223 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 6 219 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 23 ms 26.0 B 9 222 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 8 221 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 23 ms 26.0 B 11 224 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 14 227 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 13 226 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 23 ms 26.0 B 16 229 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 12 225 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 22 ms 26.0 B 15 228 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 17 230 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 22 ms 26.0 B 23 236 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 22 235 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 21 ms 26.0 B 19 232 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B 21 234 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 25 ms 26.0 B 18 231 0 SUCCESS PROCESS_LOCAL 2 / slave3 2015/02/19 11:40:05 24 ms 26.0 B 20 233 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 28 ms 26.0 B 25 238 0 SUCCESS PROCESS_LOCAL 3 / slave4 2015/02/19 11:40:05 20 ms 26.0 B 28 241 0 SUCCESS PROCESS_LOCAL 1 / slave2 2015/02/19 11:40:05 27 ms 26.0 B 27 240 0 SUCCESS PROCESS_LOCAL 0 / slave1 2015/02/19 11:40:05 10 ms 0.0 B Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Some-tasks-taking-too-much-time-to-complete-in-a-stage-tp21724.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Incorrect number of records after left outer join (I think)
if you have duplicate values for a key, join creates all pairs. Eg. if you 2 values for key X in rdd A 2 values for key X in rdd B, then a.join(B) will have 4 records for key X On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Consider the following left outer join potentialDailyModificationsRDD = reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER()); Below are the record counts for the RDDs involved Number of records for reducedDailyPairRDD: 2565206 Number of records for baselinePairRDD: 56102812 Number of records for potentialDailyModificationsRDD: 2570115 Below are the partitioners for the RDDs involved. Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400 ) Partitioner for potentialDailyModificationsRDD: Some(org.apache.spark.HashPartitioner@400) I realize in the above statement that the .partitionBy is probably not needed as the underlying RDDs used in the left outer join are already hash partitioned. My question is how the resulting RDD (potentialDailyModificationsRDD) can end up with more records than reducedDailyPairRDD. I would think the number of records in potentialDailyModificationsRDD should be 2565206 instead of 2570115. Am I missing something or is this possibly a bug? I'm using Apache Spark 1.2 on a stand-alone cluster on ec2. To get the counts for the records, I'm using the .count() for the RDD. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter data from one RDD based on data from another RDD
the more scalable alternative is to do a join (or a variant like cogroup, leftOuterJoin, subtractByKey etc. found in PairRDDFunctions) the downside is this requires a shuffle of both your RDDs On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com wrote: Hi, I have two RDD's with csv data as below : RDD-1 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164 RDD-2 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164 I need to filter RDD-2 to include only those records where the first column value in RDD-2 matches any of the first column values in RDD-1 Currently , I am broadcasting the first column values from RDD-1 as a list and then filtering RDD-2 based on that list. val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) }.collect().toSet) val rdd2filtered = rdd2.filter{ h = rdd1broadcast.value.contains(h.split(,)(0)) } This will result in data with first column 101970_5854301838 (last two records) to be filtered out from RDD-2. Is this is the best way to accomplish this ? I am worried that for large data volume , the broadcast step may become an issue. Appreciate any other suggestion. --- Thanks Himanish
Re: Failure on a Pipe operation
The error msg is telling you the exact problem, it can't find ProgramSIM, the thing you are trying to run Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev): java.io.IOException: Cannot run program ProgramSIM: error=2, No s\ uch file or directory On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com wrote: Hi, I'm trying to figure out why the following job is failing on a pipe http://pastebin.com/raw.php?i=U5E8YiNN With this exception: http://pastebin.com/raw.php?i=07NTGyPP Any help is welcome. Thank you.
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
so if you only change this line: https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137 to json.print() it processes 16 files instead? I am totally perplexed. My only suggestions to help debug are (1) see what happens when you get rid of MyModuleWorker completely -- change MyModuleDriver#process to just inStream.print() and see what happens (2) stick a bunch of printlns into MyModuleWorker#call (3) turn on DEBUG logging for org.apache.spark.streaming.dstream.FileInputDStream my gut instinct is that something else is flaky about the file input stream (eg., it makes some assumption about the file system which maybe aren't valid in your case, it has a bunch of caveats), and that it has just happened to work sometimes with your foreachRdd and failed sometimes with print. Sorry I am not a lot of help in this case, hope this leads you down the right track or somebody else can help out. Imran On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Imran, (a) I know that all 20 files are processed when I use foreachRDD, because I can see the processed files in the output directory. (My application logic writes them to an output directory after they are processed, *but* that writing operation does not happen in foreachRDD, below you can see the URL that includes my code and clarifies this). (b) I know only 16 files are processed because in the output directory I see only 16 files processed. I wait for minutes and minutes and no more files appear in the output directory. When I see only 16 files are processed and Spark Streaming went to the mode of idly watching the input directory, and then if I copy a few more files, they are also processed. (c) Sure, you can see part of my code in the following gist: https://gist.github.com/emres/0fb6de128baea099e741 It might seem a little convoluted at first, because my application is divided into two classes, a Driver class (setting up things and initializing them), and a Worker class (that implements the core functionality). I've also put the relevant methods from the my utility classes for completeness. I am as perplexed as you are as to why forcing the output via foreachRDD ended up in different behaviour compared to simply using print() method. Kind regards, Emre On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote: Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Re: MapValues and Shuffle Reads
Hi Darrin, You are asking for something near dear to me: https://issues.apache.org/jira/browse/SPARK-1061 There is a PR attached there as well. Note that you could do everything in that PR in your own user code, you don't need to wait for it to get merged, *except* for the change to HadoopRDD so that it sorts the input partitions. (Though of course, you could always just have your implementation of HadoopRDD as well ...) you could also vote for the issue watch it as well to encourage some progress on it :) On Tue, Feb 17, 2015 at 2:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Thanks Imran. I think you are probably correct. I was a bit surprised that there was no shuffle read in the initial hash partition step. I will adjust the code as you suggest to prove that is the case. I have a slightly different question. If I save an RDD to S3 (or some equivalent) and this RDD was hash partitioned at the time, do I still need to hash partition the RDD again when I read it in? Is there a way that I could prevent all of the shuffling (such as providing a hint)? My parts for the RDD will be gzipped so they would not be splittable). In reality, that's what I would really want to do in the first place. Thanks again for your insights. Darin. -- *From:* Imran Rashid iras...@cloudera.com *To:* Darin McBeath ddmcbe...@yahoo.com *Cc:* User user@spark.apache.org *Sent:* Tuesday, February 17, 2015 3:29 PM *Subject:* Re: MapValues and Shuffle Reads Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you get the partitioner for an RDD in Java?
a JavaRDD is just a wrapper around a normal RDD defined in scala, which is stored in the rdd field. You can access everything that way. The JavaRDD wrappers just provide some interfaces that are a bit easier to work with in Java. If this is at all convincing, here's me demonstrating it inside the spark-shell (yes its scala, but I'm using the java api) scala val jsc = new JavaSparkContext(sc) jsc: org.apache.spark.api.java.JavaSparkContext = org.apache.spark.api.java.JavaSparkContext@7d365529 scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c))) data: org.apache.spark.api.java.JavaRDD[Array[String]] = ParallelCollectionRDD[0] at parallelize at console:15 scala data.rdd.partitioner res0: Option[org.apache.spark.Partitioner] = None On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Percentile example
(trying to repost to the list w/out URLs -- rejected as spam earlier) Hi, Using take() is not a good idea, as you have noted it will pull a lot of data down to the driver so its not scalable. Here are some more scalable alternatives: 1. Approximate solutions 1a. Sample the data. Just sample some of the data to the driver, sort that data in memory, and take the 66th percentile of that sample. 1b. Make a histogram with pre-determined buckets. Eg., if you know your data ranges from 0 to 1 and is uniform-ish, you could make buckets every 0.01. Then count how many data points go into each bucket. Or if you only care about relative error and you have integers (often the case if your data is counts), then you can span the full range of integers with a relatively small number of buckets. Eg., you only need 200 buckets for 5% error. See the Histogram class in twitter's Ostrich library The problem is, if you have no idea what the distribution of your data is, its very hard to come up with good buckets; you could have an arbitrary amount of data going to one bucket, and thus tons of error. 1c. Use a TDigest , a compact scalable data structure for approximating distributions, and performs reasonably across a wide range of distributions. You would make one TDigest for each partition (with mapPartitions), and then merge all of the TDigests together. I wrote up a little more detail on this earlier, you can search the spark-user on nabble for tdigest 2. Exact solutions. There are also a few options here, but I'll give one that is a variant of what you suggested. Start out by doing a sortByKey. Then figure out how many records you have in each partitions (with mapPartitions). Figure out which partition the 66th percentile would be in. Then just read the one partition you want, and go down to the Nth record in that partition. To read the one partition you want, you can either (a) use mapPartitionsWithIndex, and just ignore every partition that isnt' the one you want or (b) use PartitionPruningRDD. PartitionPruningRDD will avoid launching empty tasks on the other partitions, so it will be slightly more efficient, but its also a developer api, so perhaps not worth going to that level of detail. Note that internally, sortByKey will sample your data to get an approximate distribution, to figure out what data to put in each partition. However, your still getting an exact answer this way -- the approximation is only important for distributing work among all executors. Even if the approximation is inaccurate, you'll still correct for it, you will just have unequal partitions. Imran On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote: hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? *Regards*
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web
Re: MapValues and Shuffle Reads
Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org