Spark Streaming Stuck After 10mins Issue...
Hi, I have a Spark Streaming application that reads messages from Kafka (multiple topics) and does aggregation on the data via updateStateByKey with 50 Spark workers where each has 1 core and 6G RAM. It is working fine for the first 10mins or so, but then it will stuck in the foreachRDD function. Below is the log (repeating while stuck), executor status, and the code. Log: 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_873_piece0 15/06/06 12:55:20 INFO BlockManager: Removing broadcast 875 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875_piece0 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875_piece0 of size 1820 dropped from memory (free 3369792320) 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_875_piece0 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875 of size 2624 dropped from memory (free 3369794944) *15/06/06 12:56:13 INFO MemoryStore: ensureFreeSpace(1650825) called with curMem=24335467, maxMem=3394130411 15/06/06 12:56:13 INFO MemoryStore: Block input-0-1433620573400 stored as bytes in memory (estimated size 1612.1 KB, free 3.1 GB) 15/06/06 12:56:13 INFO BlockManagerMaster: Updated info of block input-0-1433620573400 15/06/06 12:56:20 INFO MemoryStore: ensureFreeSpace(1682877) called with curMem=25986292, maxMem=3394130411 15/06/06 12:56:20 INFO MemoryStore: Block input-0-1433620579800 stored as bytes in memory (estimated size 1643.4 KB, free 3.1 GB) 15/06/06 12:56:20 INFO BlockManagerMaster: Updated info of block input-0-1433620579800 15/06/06 12:56:25 INFO MemoryStore: ensureFreeSpace(1642661) called with curMem=27669169, maxMem=3394130411 15/06/06 12:56:25 INFO MemoryStore: Block input-0-1433620585000 stored as bytes in memory (estimated size 1604.2 KB, free 3.1 GB) 15/06/06 12:56:25 INFO BlockManagerMaster: Updated info of block input-0-1433620585000* Executor: Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time DurationGC Time Input Size / RecordsShuffle Read Size / Records Errors 1 90870 SUCCESS PROCESS_LOCAL 40 / 10.10.10.10 2015/06/06 12:54:26 3 s 7.7 MB (memory) / 11662 337.2 KB / 6588 0 90860 RUNNING PROCESS_LOCAL 6 / 10.10.10.10 2015/06/06 12:54:26 3.7 min 2 s 7.8 MB (memory) / 11792 327.4 KB / 6422 Memory: 39.0 MB Used (160.1 GB Total) Disk: 0.0 B Used Executor ID Address RDD Blocks Memory Used Disk Used Active TasksFailed Tasks Complete Tasks Total Tasks Task Time Input ▴ Shuffle Read Shuffle Write LogsThread Dump 6 10.10.10.10:49298 2 15.6 MB / 3.2 GB0.0 B 1 0 412 413 7.0 m 1291.3 MB 8.3 MB 1437.1 KB 40 10.10.10.10:37480 3 23.4 MB / 3.2 GB0.0 B 0 0 413 413 7.1 m 1288.6 MB 10.8 MB 4.0 MB Sample code: val stateDStream = analyticsDStream.updateStateByKey[StreamBase](updateAnalyticsIterFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) stateDStream.foreachRDD(rdd = { rdd.foreachPartition(partition = { partition.foreach(record = { //Do nothing, and it is still stuck }) }) }) There is no error messages, and the memory usage seems fine though. Is there any reason it will be stuck? How can I resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Stuck After 10mins Issue...
And here is the Thread Dump, where seems every worker is waiting for Executor #6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to be complete: Thread 41: BLOCK_MANAGER cleanup timer (WAITING) Thread 42: BROADCAST_VARS cleanup timer (WAITING) Thread 44: shuffle-client-0 (RUNNABLE) Thread 45: shuffle-server-0 (RUNNABLE) Thread 47: Driver Heartbeater (TIMED_WAITING) Thread 48: Executor task launch worker-0 (RUNNABLE) Thread 56: threadDeathWatcher-2-1 (TIMED_WAITING) Thread 81: sparkExecutor-akka.actor.default-dispatcher-18 (WAITING) Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) ** sun.management.ThreadImpl.dumpThreads0(Native Method) sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446) org.apache.spark.util.Utils$.getThreadDump(Utils.scala:1777) org.apache.spark.executor.ExecutorActor$$anonfun$receiveWithLogging$1.applyOrElse(ExecutorActor.scala:38) scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) akka.actor.Actor$class.aroundReceive(Actor.scala:465) org.apache.spark.executor.ExecutorActor.aroundReceive(ExecutorActor.scala:34) akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) akka.actor.ActorCell.invoke(ActorCell.scala:487) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) akka.dispatch.Mailbox.run(Mailbox.scala:220) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ** Thread 112: sparkExecutor-akka.actor.default-dispatcher-25 (WAITING) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189p23190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext Threading
Hi Lee, it's actually not related to threading at all - you would still have the same problem even if you were using a single thread. See this section ( https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark) of the Spark docs. On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote: On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote: Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark Ah, I see! So if I broke out the lambda expressions into a method on an object it would prevent this issue. Essentially, don't use lambda expressions when using threads. Thanks again, I appreciate the help.
Filling Parquet files by values in Value of a JavaPairRDD
Hello Sparkers, I'm reading data from a CSV file, applying some transformations and ending up with an RDD of pairs (String,Iterable). I have already prepared Parquet files. I want now to take the previous (key,value) RDD and populate the parquet files like follows: - key holds the name of the Parquet file. - value holds values to save in the parquet file whose name is the key. I tried the simplest way that one can think of: creating a DataFrame inside a 'map' or 'foreach' on the pair RDD, but this gave NullPointerException. I read and found that this is because of nesting RDDs, which is not allowed. Any help of how to achieve this in another way? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Filling-Parquet-files-by-values-in-Value-of-a-JavaPairRDD-tp23188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Problem getting program to run on 15TB input
Very interesting and relevant thread for production level usage of spark. @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase? Thanks, Kapil Malik | kma...@adobe.commailto:kma...@adobe.com | 33430 / 8800836581 From: Daniel Mahler [mailto:dmah...@gmail.com] Sent: 13 April 2015 15:42 To: Arun Luthra Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org Subject: Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.commailto:ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.commailto:paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.commailto:arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
which database for gene alignment data ?
I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
Re: Which class takes place of BlockManagerWorker in Spark 1.3.1
Hi, Please take a look at: [SPARK-3019] Pluggable block transfer interface (BlockTransferService) - NioBlockTransferService implements BlockTransferService and replaces the old BlockManagerWorker Cheers On Sat, Jun 6, 2015 at 2:23 AM, bit1...@163.com bit1...@163.com wrote: Hi, I remembered that there is a class called BlockManagerWorker in spark previous releases. In the 1.3.1 code, I could see that some method comment still refers to BlockManagerWorker which doesn't exist at all. I would ask which class takes place of BlockManagerWorker in Spark 1.3.1? Thanks. BTW, BlockManagerMaster is there, it makes no sense that BlockManagerWorker is gone. -- bit1...@163.com
Logging in spark-shell on master
Hello, I've created a spark cluster on ec2 using the spark-ec2 script. I would like to be able to modify the logging level of the spark-shell when it is running on the master. I've copied the log4j.properties template file and changed the root logger level to WARN and that doesn't seem to have any effect. I still see tons of INFO messages in the shell. I can import the log4j classes at the shell prompt and call Logger.getLogger(org).setLevel(Level.WARN) to set the level and that does work but I'd prefer to not have to do that every time I run the shell. Any suggestions? Thanks.
Which class takes place of BlockManagerWorker in Spark 1.3.1
Hi, I remembered that there is a class called BlockManagerWorker in spark previous releases. In the 1.3.1 code, I could see that some method comment still refers to BlockManagerWorker which doesn't exist at all. I would ask which class takes place of BlockManagerWorker in Spark 1.3.1? Thanks. BTW, BlockManagerMaster is there, it makes no sense that BlockManagerWorker is gone. bit1...@163.com
Re: which database for gene alignment data ?
Can you describe your use case in a bit more detail since not all people on this mailing list are familiar with gene sequencing alignments data ? Thanks On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote: I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
write multiple outputs by key
Hi, How can I write to multiple outputs for each key? I tried to create custom partitioner or define the number of partition but does not work. There are only the few tasks/partitions (which equals to the number of all key combination) gets large datasets, data is not splitting to all tasks/partition. The job failed as the few tasks handled too far large datasets. Below is my code snippet. val varWFlatRDD = varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are (zone, z, year, month) .foreach( x = { val z = x._1._1 val year = x._1._2 val month = x._1._3 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable(table_4Dim) hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + ZONE + ,z= + z + ,year= + year + ,month= + month + ) + select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim); }) From the spark history UI, at groupByKey there are 1000 tasks (equals to the parent's # partitions). at foreach there are 1000 tasks as well, but 50 tasks (same as the # all key combination) gets datasets. How can I fix this problem? Any suggestions are appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext Threading
Hi Will, That doesn't seem to be the case and was part of the source of my confusion. The code currently in the run method of the runnable works perfectly fine with the lambda expressions when it is invoked from the main method. They also work when they are invoked from within a separate method on the Transforms object. It was only when putting that same code into another thread that the serialization exception occurred. Examples throughout the spark docs also use lambda expressions a lot - surely those examples also would not work if this is always an issue with lambdas? On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote: Hi Lee, it's actually not related to threading at all - you would still have the same problem even if you were using a single thread. See this section ( https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark) of the Spark docs. On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote: On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote: Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark Ah, I see! So if I broke out the lambda expressions into a method on an object it would prevent this issue. Essentially, don't use lambda expressions when using threads. Thanks again, I appreciate the help.
Don't understand the numbers on the Storage UI(/storage/rdd/?id=4)
Hi, I do a word count application with 600M text file, and give the RDD's StorageLevel as StorageLevel.MEMORY_AND_DISK_2. I got two questions that I can't explain: 1. The StorageLevel shown on the UI is Disk Serialized 2x Replicated,but I am using StorageLevel.MEMORY_AND_DISK_2,where is the Memory info? Storage Level: Disk Serialized 2x Replicated Cached Partitions: 20 Total Partitions: 20 Memory Size: 107.6 MB Disk Size: 277.1 MB 2. My textfile is 600M,but the memory and Disk size shown above is about 400M total(107.6M + 277.1M), and I am using 2 replications, So, in my opinion it should be about 600M * 2, Looks some compression happens under the scene or something else? Thanks! bit1...@163.com
hiveContext.sql NullPointerException
Hi, I try to insert data into a partitioned hive table. The groupByKey is to combine dataset into a partition of the hive table. After the groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But the hiveContext.sql throws NullPointerException, see below. Any suggestions? What could be wrong? Thanks! val varWHeightFlatRDD = varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey() .foreach( x = { val zone = x._1._1 val z = x._1._2 val year = x._1._3 val month = x._1._4 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable(table_4Dim) hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + zone + ,z= + z + ,year= + year + ,month= + month + ) + select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim); }) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:100) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:113) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:103) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext Threading
Hi Lee, I'm stuck with only mobile devices for correspondence right now, so I can't get to shell to play with this issue - this is all supposition; I think that the lambdas are closing over the context because it's a constructor parameter to your Runnable class, which is why inlining the lambdas into your main method doesn't show this issue. On Sat, Jun 6, 2015, 10:55 AM Lee McFadden splee...@gmail.com wrote: Hi Will, That doesn't seem to be the case and was part of the source of my confusion. The code currently in the run method of the runnable works perfectly fine with the lambda expressions when it is invoked from the main method. They also work when they are invoked from within a separate method on the Transforms object. It was only when putting that same code into another thread that the serialization exception occurred. Examples throughout the spark docs also use lambda expressions a lot - surely those examples also would not work if this is always an issue with lambdas? On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote: Hi Lee, it's actually not related to threading at all - you would still have the same problem even if you were using a single thread. See this section ( https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark) of the Spark docs. On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote: On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote: Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark Ah, I see! So if I broke out the lambda expressions into a method on an object it would prevent this issue. Essentially, don't use lambda expressions when using threads. Thanks again, I appreciate the help.
Re: Access several s3 buckets, with credentials containing /
Hi Pierre, One way is to recreate your credentials until AWS generates one without a slash character in it. Another way I've been using is to pass these credentials outside the S3 file path by setting the following (where sc is the SparkContext). sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, ACCESS_KEY) sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey, SECRET_KEY) After that you can define the RDDs more simply: val c1 = sc.textFile(s3n://bucket1/file.csv) -sujit On Fri, Jun 5, 2015 at 3:55 AM, Steve Loughran ste...@hortonworks.com wrote: On 5 Jun 2015, at 08:03, Pierre B pierre.borckm...@realimpactanalytics.com wrote: Hi list! My problem is quite simple. I need to access several S3 buckets, using different credentials.: ``` val c1 = sc.textFile(s3n://[ACCESS_KEY_ID1:SECRET_ACCESS_KEY1]@bucket1/file.csv).count val c2 = sc.textFile(s3n://[ACCESS_KEY_ID2:SECRET_ACCESS_KEY2]@bucket2/file.csv).count val c3 = sc.textFile(s3n://[ACCESS_KEY_ID3:SECRET_ACCESS_KEY3]@bucket3/file.csv).count ... ``` One/several of those AWS credentials might contain / in the private access key. This is a known problem and from my research, the only ways to deal with these / are: 1/ use environment variables to set the AWS credentials, then access the s3 buckets without specifying the credentials 2/ set the hadoop configuration to contain the the credentials. However, none of these solutions allow me to access different buckets, with different credentials. Can anyone help me on this? Thanks Pierre long known outstanding bug in Hadoop s3n, nobody has ever sat down to fix. One subtlety is its really hard to test -as you need credentials with a / in. The general best practise is recreate your credentials Now, if you can get the patch to work against hadoop trunk, I promise I will commit it https://issues.apache.org/jira/browse/HADOOP-3733 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: write multiple outputs by key
I believe groupByKey currently requires that all items for a specific key fit into a single and executive's memory: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html This previous discussion has some pointers if you must use groupByKey, including adding a low-cardinality hash to your key: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html Another option I didn't see mentioned would be to persist / cache the initial RDD, calculate the set of distinct key values out of it, and then derive a set of filtered RDDs from the cached dataset, one for each key. For this to work, your set of unique keys would need to fit into your driver's memory. Regards, Will On June 6, 2015, at 11:07 AM, patcharee patcharee.thong...@uni.no wrote: Hi, How can I write to multiple outputs for each key? I tried to create custom partitioner or define the number of partition but does not work. There are only the few tasks/partitions (which equals to the number of all key combination) gets large datasets, data is not splitting to all tasks/partition. The job failed as the few tasks handled too far large datasets. Below is my code snippet. val varWFlatRDD = varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are (zone, z, year, month) .foreach( x = { val z = x._1._1 val year = x._1._2 val month = x._1._3 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable(table_4Dim) hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + ZONE + ,z= + z + ,year= + year + ,month= + month + ) + select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim); }) From the spark history UI, at groupByKey there are 1000 tasks (equals to the parent's # partitions). at foreach there are 1000 tasks as well, but 50 tasks (same as the # all key combination) gets datasets. How can I fix this problem? Any suggestions are appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org