Spark Streaming Stuck After 10mins Issue...

2015-06-06 Thread EH
Hi,

I have a Spark Streaming application that reads messages from Kafka
(multiple topics) and does aggregation on the data via updateStateByKey with
50 Spark workers where each has 1 core and 6G RAM.  It is working fine for
the first 10mins or so, but then it will stuck in the foreachRDD function. 
Below is the log (repeating while stuck), executor status, and the code.


Log:
15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block
broadcast_873_piece0
15/06/06 12:55:20 INFO BlockManager: Removing broadcast 875
15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875_piece0
15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875_piece0 of size 1820
dropped from memory (free 3369792320)
15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block
broadcast_875_piece0
15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875
15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875 of size 2624 dropped
from memory (free 3369794944)
*15/06/06 12:56:13 INFO MemoryStore: ensureFreeSpace(1650825) called with
curMem=24335467, maxMem=3394130411
15/06/06 12:56:13 INFO MemoryStore: Block input-0-1433620573400 stored as
bytes in memory (estimated size 1612.1 KB, free 3.1 GB)
15/06/06 12:56:13 INFO BlockManagerMaster: Updated info of block
input-0-1433620573400
15/06/06 12:56:20 INFO MemoryStore: ensureFreeSpace(1682877) called with
curMem=25986292, maxMem=3394130411
15/06/06 12:56:20 INFO MemoryStore: Block input-0-1433620579800 stored as
bytes in memory (estimated size 1643.4 KB, free 3.1 GB)
15/06/06 12:56:20 INFO BlockManagerMaster: Updated info of block
input-0-1433620579800
15/06/06 12:56:25 INFO MemoryStore: ensureFreeSpace(1642661) called with
curMem=27669169, maxMem=3394130411
15/06/06 12:56:25 INFO MemoryStore: Block input-0-1433620585000 stored as
bytes in memory (estimated size 1604.2 KB, free 3.1 GB)
15/06/06 12:56:25 INFO BlockManagerMaster: Updated info of block
input-0-1433620585000*


Executor:
Tasks
Index   ID  Attempt Status  Locality Level  Executor ID / Host  Launch 
Time
DurationGC Time Input Size / RecordsShuffle Read Size / Records 
Errors
1   90870   SUCCESS PROCESS_LOCAL   40 / 10.10.10.10
2015/06/06 12:54:26 3 s 7.7
MB (memory) / 11662 337.2 KB / 6588 
0   90860   RUNNING PROCESS_LOCAL   6 / 10.10.10.10 
2015/06/06 12:54:26 3.7 min
2 s 7.8 MB (memory) / 11792 327.4 KB / 6422 

Memory: 39.0 MB Used (160.1 GB Total)
Disk: 0.0 B Used
Executor ID Address RDD Blocks  Memory Used Disk Used   Active 
TasksFailed
Tasks   Complete Tasks  Total Tasks Task Time   Input ▴ Shuffle Read
Shuffle
Write   LogsThread Dump
6   10.10.10.10:49298   2   15.6 MB / 3.2 GB0.0 B   1   
0   412 413 7.0 m   1291.3 MB   8.3
MB  1437.1 KB   
40  10.10.10.10:37480   3   23.4 MB / 3.2 GB0.0 B   0   
0   413 413 7.1 m   1288.6 MB
10.8 MB 4.0 MB  


Sample code:
val stateDStream =
analyticsDStream.updateStateByKey[StreamBase](updateAnalyticsIterFunc, new
HashPartitioner(ssc.sparkContext.defaultParallelism), true)
stateDStream.foreachRDD(rdd = {
  rdd.foreachPartition(partition = {
partition.foreach(record = {
   //Do nothing, and it is still stuck
})
  })
})

There is no error messages, and the memory usage seems fine though.  Is
there any reason it will be stuck?  How can I resolve this issue?

Thank you for your help,
Eason




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Stuck After 10mins Issue...

2015-06-06 Thread EH
And here is the Thread Dump, where seems every worker is waiting for Executor
#6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to
be complete:

Thread 41: BLOCK_MANAGER cleanup timer (WAITING)
Thread 42: BROADCAST_VARS cleanup timer (WAITING)
Thread 44: shuffle-client-0 (RUNNABLE)
Thread 45: shuffle-server-0 (RUNNABLE)
Thread 47: Driver Heartbeater (TIMED_WAITING)
Thread 48: Executor task launch worker-0 (RUNNABLE)
Thread 56: threadDeathWatcher-2-1 (TIMED_WAITING)
Thread 81: sparkExecutor-akka.actor.default-dispatcher-18 (WAITING)
Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE)
**
sun.management.ThreadImpl.dumpThreads0(Native Method)
sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446)
org.apache.spark.util.Utils$.getThreadDump(Utils.scala:1777)
org.apache.spark.executor.ExecutorActor$$anonfun$receiveWithLogging$1.applyOrElse(ExecutorActor.scala:38)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.spark.executor.ExecutorActor.aroundReceive(ExecutorActor.scala:34)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
akka.dispatch.Mailbox.run(Mailbox.scala:220)
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
**
Thread 112: sparkExecutor-akka.actor.default-dispatcher-25 (WAITING)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189p23190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext Threading

2015-06-06 Thread Will Briggs
Hi Lee, it's actually not related to threading at all - you would still have 
the same problem even if you were using a single thread. See this section ( 
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs. 

On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:

On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


Ah, I see!  So if I broke out the lambda expressions into a method on an object 
it would prevent this issue.  Essentially, don't use lambda expressions when 
using threads.


Thanks again, I appreciate the help. 



Filling Parquet files by values in Value of a JavaPairRDD

2015-06-06 Thread Mohamed Nadjib Mami
Hello Sparkers,

I'm reading data from a CSV  file, applying some transformations and ending
up with an RDD of pairs (String,Iterable).

I have already prepared Parquet files. I want now to take the previous
(key,value) RDD and populate the parquet files like follows:
- key holds the name of the Parquet file.
- value holds values to save in the parquet file whose name is the key.

I tried the simplest way that one can think of: creating a DataFrame inside
a 'map' or 'foreach' on the pair RDD, but this gave NullPointerException. I
read and found that this is because of nesting RDDs, which is not allowed.

Any help of how to achieve this in another way?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filling-Parquet-files-by-values-in-Value-of-a-JavaPairRDD-tp23188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
Very interesting and relevant thread for production level usage of spark.

@Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?

Thanks,

Kapil Malik | kma...@adobe.commailto:kma...@adobe.com | 33430 / 8800836581

From: Daniel Mahler [mailto:dmah...@gmail.com]
Sent: 13 April 2015 15:42
To: Arun Luthra
Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
Subject: Re: Problem getting program to run on 15TB input

Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all 
the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage 
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
I tried a shorter simper version of the program, with just 1 RDD,  essentially 
it is:

sc.textFile(..., N).map().filter().map( blah = (id, 
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 
9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, 
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 
9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, 
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 
9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, 
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 
9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, 
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 
9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, 
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 
9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, 
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the data 
get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of 
executor-cores (to 2), based on suggestions at 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important thing 
was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The Spark UI names the line number and name of the operation (repartition in 
this case) that it is performing. Only if this information is wrong (just a 
possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey (or 
simply countByKey) which is relatively inexpensive. For the purposes of this 
algorithm I can simply log and remove keys with huge counts, before doing 
groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
ilike...@gmail.commailto:ilike...@gmail.com wrote:
All stated symptoms are consistent with GC pressure (other nodes timeout trying 
to connect because of a long stop-the-world), quite possibly due to groupByKey. 
groupByKey is a very expensive operation as it may bring all the data for a 
particular partition into memory (in particular, it cannot spill values for a 
single key, so if you have a single very skewed key you can get behavior like 
this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
paul.sz...@gmail.commailto:paul.sz...@gmail.com wrote:

But groupbykey will repartition according to numer of keys as I understand how 
it works. How do you know that you haven't reached the groupbykey phase? Are 
you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
arun.lut...@gmail.commailto:arun.lut...@gmail.com napisał:

A correction to my first post:

There is also a repartition right before groupByKey to help avoid 
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 

which database for gene alignment data ?

2015-06-06 Thread roni
I want to use spark for reading compressed .bed file for reading gene
sequencing alignments data.
I want to store bed file data in db and then use external gene expression
data to find overlaps etc, which database is best for it ?
Thanks
-Roni


Re: Which class takes place of BlockManagerWorker in Spark 1.3.1

2015-06-06 Thread Ted Yu
Hi,
Please take a look at:
[SPARK-3019] Pluggable block transfer interface (BlockTransferService)

- NioBlockTransferService implements BlockTransferService and replaces the
old BlockManagerWorker

Cheers

On Sat, Jun 6, 2015 at 2:23 AM, bit1...@163.com bit1...@163.com wrote:

 Hi,
 I remembered that there is a class called BlockManagerWorker in spark
 previous releases. In the 1.3.1 code, I could see that some method
 comment still refers to BlockManagerWorker which doesn't exist at all.
 I would ask which class takes place of BlockManagerWorker in Spark 1.3.1?
 Thanks.

 BTW, BlockManagerMaster is there, it makes no sense that
 BlockManagerWorker is gone.

 --
 bit1...@163.com



Logging in spark-shell on master

2015-06-06 Thread Robert Pond
Hello,

I've created a spark cluster on ec2 using the spark-ec2 script. I would
like to be able to modify the logging level of the spark-shell when it is
running on the master. I've copied the log4j.properties template file and
changed the root logger level to WARN and that doesn't seem to have any
effect. I still see tons of INFO messages in the shell.

I can import the log4j classes at the shell prompt and call

Logger.getLogger(org).setLevel(Level.WARN)

to set the level and that does work but I'd prefer to not have to do that
every time I run the shell.

Any suggestions?

Thanks.


Which class takes place of BlockManagerWorker in Spark 1.3.1

2015-06-06 Thread bit1...@163.com
Hi,
I remembered that there is a class called BlockManagerWorker in spark previous 
releases. In the 1.3.1 code, I could see that some method comment still refers 
to BlockManagerWorker which doesn't exist at all.
I would ask which class takes place of BlockManagerWorker in Spark 1.3.1? 
Thanks. 

BTW, BlockManagerMaster is there, it makes no sense that BlockManagerWorker is 
gone.



bit1...@163.com


Re: which database for gene alignment data ?

2015-06-06 Thread Ted Yu
Can you describe your use case in a bit more detail since not all people on
this mailing list are familiar with gene sequencing alignments data ?

Thanks

On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote:

 I want to use spark for reading compressed .bed file for reading gene
 sequencing alignments data.
 I want to store bed file data in db and then use external gene expression
 data to find overlaps etc, which database is best for it ?
 Thanks
 -Roni




write multiple outputs by key

2015-06-06 Thread patcharee

Hi,

How can I write to multiple outputs for each key? I tried to create 
custom partitioner or define the number of partition but does not work. 
There are only the few tasks/partitions (which equals to the number of 
all key combination) gets large datasets, data is not splitting to all 
tasks/partition. The job failed as the few tasks handled too far large 
datasets. Below is my code snippet.


val varWFlatRDD = 
varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are 
(zone, z, year, month)

.foreach(
x = {
  val z = x._1._1
  val year = x._1._2
  val month = x._1._3
  val df_table_4dim = x._2.toList.toDF()
  df_table_4dim.registerTempTable(table_4Dim)
  hiveContext.sql(INSERT OVERWRITE table 4dim partition 
(zone= + ZONE + ,z= + z + ,year= + year + ,month= + month + )  +
select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim);

  })


From the spark history UI, at groupByKey there are  1000 tasks (equals 
to the parent's # partitions). at foreach there are  1000 tasks as 
well, but 50 tasks (same as the # all key combination)  gets datasets.


How can I fix this problem? Any suggestions are appreciated.

BR,
Patcharee



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext Threading

2015-06-06 Thread Lee McFadden
Hi Will,

That doesn't seem to be the case and was part of the source of my
confusion. The code currently in the run method of the runnable works
perfectly fine with the lambda expressions when it is invoked from the main
method. They also work when they are invoked from within a separate method
on the Transforms object.

It was only when putting that same code into another thread that the
serialization exception occurred.

Examples throughout the spark docs also use lambda expressions a lot -
surely those examples also would not work if this is always an issue with
lambdas?

On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote:

 Hi Lee, it's actually not related to threading at all - you would still
 have the same problem even if you were using a single thread. See this
 section (
 https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs.


 On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are closing
 around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


 Ah, I see!  So if I broke out the lambda expressions into a method on an
 object it would prevent this issue.  Essentially, don't use lambda
 expressions when using threads.

 Thanks again, I appreciate the help.



Don't understand the numbers on the Storage UI(/storage/rdd/?id=4)

2015-06-06 Thread bit1...@163.com
Hi, 
I do a word count application with 600M text file, and give the RDD's  
StorageLevel as StorageLevel.MEMORY_AND_DISK_2. 
I got two questions that I can't explain:
1. The StorageLevel shown on the UI is Disk Serialized 2x Replicated,but I am 
using StorageLevel.MEMORY_AND_DISK_2,where is the Memory info?
Storage Level: Disk Serialized 2x Replicated 
Cached Partitions: 20 
Total Partitions: 20 
Memory Size: 107.6 MB 
Disk Size: 277.1 MB 

2. My textfile is 600M,but the memory and Disk size shown above is about 400M 
total(107.6M + 277.1M), and I am using 2 replications, So, in my opinion it 
should be about 600M * 2, Looks some compression happens under the scene or 
something else?

Thanks!


bit1...@163.com


hiveContext.sql NullPointerException

2015-06-06 Thread patcharee

Hi,

I try to insert data into a partitioned hive table. The groupByKey is to 
combine dataset into a partition of the hive table. After the 
groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But 
the hiveContext.sql  throws NullPointerException, see below. Any 
suggestions? What could be wrong? Thanks!


val varWHeightFlatRDD = 
varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey()

  .foreach(
x = {
  val zone = x._1._1
  val z = x._1._2
  val year = x._1._3
  val month = x._1._4
  val df_table_4dim = x._2.toList.toDF()
  df_table_4dim.registerTempTable(table_4Dim)
  hiveContext.sql(INSERT OVERWRITE table 4dim partition 
(zone= + zone + ,z= + z + ,year= + year + ,month= + month + )  +
select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim);


})


java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:100)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:113)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:103)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext Threading

2015-06-06 Thread William Briggs
Hi Lee, I'm stuck with only mobile devices for correspondence right now, so
I can't get to shell to play with this issue - this is all supposition; I
think that the lambdas are closing over the context because it's a
constructor parameter to your Runnable class, which is why inlining the
lambdas into your main method doesn't show this issue.

On Sat, Jun 6, 2015, 10:55 AM Lee McFadden splee...@gmail.com wrote:

 Hi Will,

 That doesn't seem to be the case and was part of the source of my
 confusion. The code currently in the run method of the runnable works
 perfectly fine with the lambda expressions when it is invoked from the main
 method. They also work when they are invoked from within a separate method
 on the Transforms object.

 It was only when putting that same code into another thread that the
 serialization exception occurred.

 Examples throughout the spark docs also use lambda expressions a lot -
 surely those examples also would not work if this is always an issue with
 lambdas?

 On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote:

 Hi Lee, it's actually not related to threading at all - you would still
 have the same problem even if you were using a single thread. See this
 section (
 https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs.


 On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are
 closing around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


 Ah, I see!  So if I broke out the lambda expressions into a method on an
 object it would prevent this issue.  Essentially, don't use lambda
 expressions when using threads.

 Thanks again, I appreciate the help.




Re: Access several s3 buckets, with credentials containing /

2015-06-06 Thread Sujit Pal
Hi Pierre,

One way is to recreate your credentials until AWS generates one without a
slash character in it. Another way I've been using is to pass these
credentials outside the S3 file path by setting the following (where sc is
the SparkContext).

sc._jsc.hadoopConfiguration().set(fs.s3n.awsAccessKeyId, ACCESS_KEY)

sc._jsc.hadoopConfiguration().set(fs.s3n.awsSecretAccessKey,
SECRET_KEY)

After that you can define the RDDs more simply:

val c1 = sc.textFile(s3n://bucket1/file.csv)

-sujit



On Fri, Jun 5, 2015 at 3:55 AM, Steve Loughran ste...@hortonworks.com
wrote:


  On 5 Jun 2015, at 08:03, Pierre B 
 pierre.borckm...@realimpactanalytics.com wrote:
 
  Hi list!
 
  My problem is quite simple.
  I need to access several S3 buckets, using different credentials.:
  ```
  val c1 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID1:SECRET_ACCESS_KEY1]@bucket1/file.csv).count
  val c2 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID2:SECRET_ACCESS_KEY2]@bucket2/file.csv).count
  val c3 =
 
 sc.textFile(s3n://[ACCESS_KEY_ID3:SECRET_ACCESS_KEY3]@bucket3/file.csv).count
  ...
  ```
 
  One/several of those AWS credentials might contain / in the private
 access
  key.
  This is a known problem and from my research, the only ways to deal with
  these / are:
  1/ use environment variables to set the AWS credentials, then access the
 s3
  buckets without specifying the credentials
  2/ set the hadoop configuration to contain the the credentials.
 
  However, none of these solutions allow me to access different buckets,
 with
  different credentials.
 
  Can anyone help me on this?
 
  Thanks
 
  Pierre

 long known outstanding bug in Hadoop s3n, nobody has ever sat down to fix.
 One subtlety is its really hard to test -as you need credentials with a /
 in.

 The general best practise is recreate your credentials

 Now, if you can get the patch to work against hadoop trunk, I promise I
 will commit it
 https://issues.apache.org/jira/browse/HADOOP-3733

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: write multiple outputs by key

2015-06-06 Thread Will Briggs
I believe groupByKey currently requires that all items for a specific key fit 
into a single and executive's memory: 
http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

This previous discussion has some pointers if you must use groupByKey, 
including adding a low-cardinality hash to your key: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html

Another option I didn't see mentioned would be to persist / cache the initial 
RDD, calculate the set of distinct key values out of it, and then derive a set 
of filtered RDDs from the cached dataset, one for each key. For this to work, 
your set of unique keys would need to fit into your driver's memory.

Regards,
Will

On June 6, 2015, at 11:07 AM, patcharee patcharee.thong...@uni.no wrote:

Hi,

How can I write to multiple outputs for each key? I tried to create 
custom partitioner or define the number of partition but does not work. 
There are only the few tasks/partitions (which equals to the number of 
all key combination) gets large datasets, data is not splitting to all 
tasks/partition. The job failed as the few tasks handled too far large 
datasets. Below is my code snippet.

val varWFlatRDD = 
varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are 
(zone, z, year, month)
 .foreach(
 x = {
   val z = x._1._1
   val year = x._1._2
   val month = x._1._3
   val df_table_4dim = x._2.toList.toDF()
   df_table_4dim.registerTempTable(table_4Dim)
   hiveContext.sql(INSERT OVERWRITE table 4dim partition 
(zone= + ZONE + ,z= + z + ,year= + year + ,month= + month + )  +
 select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim);
   })


 From the spark history UI, at groupByKey there are  1000 tasks (equals 
to the parent's # partitions). at foreach there are  1000 tasks as 
well, but 50 tasks (same as the # all key combination)  gets datasets.

How can I fix this problem? Any suggestions are appreciated.

BR,
Patcharee



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org