Re: Poor performance caused by coalesce to 1

2021-02-03 Thread Stéphane Verlet
I had that issue too and from what I gathered, it is an expected 
optimization... Try using repartiion instead

⁣Get BlueMail for Android ​

On Feb 3, 2021, 11:55, at 11:55, James Yu  wrote:
>Hi Team,
>
>We are running into this poor performance issue and seeking your
>suggestion on how to improve it:
>
>We have a particular dataset which we aggregate from other datasets and
>like to write out to one single file (because it is small enough).  We
>found that after a series of transformations (GROUP BYs, FLATMAPs), we
>coalesced the final RDD to 1 partition before writing it out, and this
>coalesce degrade the performance, not that this additional coalesce
>operation took additional runtime, but it somehow dictates the
>partitions to use in the upstream transformations.
>
>We hope there is a simple and useful way to solve this kind of issue
>which we believe is quite common for many people.
>
>
>Thanks
>
>James


Re: Java Rdd of String to dataframe

2017-10-12 Thread Stéphane Verlet
you can specify the schema programmatically

https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

On Wed, Oct 11, 2017 at 3:35 PM, sk skk  wrote:

> Can we create a dataframe from a Java pair rdd of String . I don’t have a
> schema as it will be a dynamic Json. I gave encoders.string class.
>
> Any help is appreciated !!
>
> Thanks,
> SK
>


Re: Spark job taking 10s to allocate executors and memory before submitting job

2017-09-28 Thread Stéphane Verlet
Sounds like such a small job , if you running in on a cluster  have you
consider simply running it locally  (master = local) ?

On Wed, Sep 27, 2017 at 7:06 AM, navneet sharma  wrote:

> Hi,
>
> I am running spark job taking total 18s, in that 8 seconds for actual
> processing logic(business logic) and 10s for allocating executors and
> memory. How to reduce initial time.
>
> Any ideas how to reduce time before spark job goes to submit state.
>
> thanks,
>
> Navneet Sharma
>


Re: Running Hive and Spark together with Dynamic Resource Allocation

2016-10-28 Thread Stéphane Verlet
This works for us

 
yarn.nodemanager.aux-services
mapreduce_shuffle,spark_shuffle
  

  
yarn.nodemanager.aux-services.mapreduce_shuffle.class
org.apache.hadoop.mapred.ShuffleHandler
  

  
yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService
  





On Thu, Oct 27, 2016 at 8:13 AM, rachmaninovquartet <
rachmaninovquar...@gmail.com> wrote:

> oblem is that for Spark 1.5.2 with dynamic resource allocation to
> function properly we needed to set y
>


Re: rdd split into new rdd

2015-12-23 Thread Stéphane Verlet
You should be able to do that using mapPartition

On Wed, Dec 23, 2015 at 8:24 AM, Ted Yu  wrote:

> bq. {a=1, b=1, c=2, d=2}
>
> Can you elaborate your criteria a bit more ? The above seems to be a Set,
> not a Map.
>
> Cheers
>
> On Wed, Dec 23, 2015 at 7:11 AM, Yasemin Kaya  wrote:
>
>> Hi,
>>
>> I have data
>> *JavaPairRDD> *format. In example:
>>
>> *(1610, {a=1, b=1, c=2, d=2}) *
>>
>> I want to get
>> *JavaPairRDD* In example:
>>
>>
>> *(1610, {a, b})*
>> *(1610, {c, d})*
>>
>> Is there a way to solve this problem?
>>
>> Best,
>> yasemin
>> --
>> hiç ender hiç
>>
>
>


Re: rdd split into new rdd

2015-12-23 Thread Stéphane Verlet
I use Scala , but I guess in Java  code  would like this

JavaPairRDD<String, TreeMap<String, Integer>> rdd ...

JavaPairRDD<String, List> rdd2 = rdd.mapPartitionsToPair(function ,
true)

where function implements
   PairFlatMapFunction<java.util.Iterator<TreeMap<String, Integer>>,String,
List>

Iterable<scala.Tuple2<String,List>>
call(java.util.Iterator<TreeMap<String, Integer>> Ite){

// Iterate over your treemaps and generate your lists

}






On Wed, Dec 23, 2015 at 10:49 AM, Yasemin Kaya <godo...@gmail.com> wrote:

> How can i use mapPartion? Could u give me an example?
>
> 2015-12-23 17:26 GMT+02:00 Stéphane Verlet <kaweahsoluti...@gmail.com>:
>
>> You should be able to do that using mapPartition
>>
>> On Wed, Dec 23, 2015 at 8:24 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq. {a=1, b=1, c=2, d=2}
>>>
>>> Can you elaborate your criteria a bit more ? The above seems to be a
>>> Set, not a Map.
>>>
>>> Cheers
>>>
>>> On Wed, Dec 23, 2015 at 7:11 AM, Yasemin Kaya <godo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have data
>>>> *JavaPairRDD<String, TreeMap<String, Integer>> *format. In example:
>>>>
>>>> *(1610, {a=1, b=1, c=2, d=2}) *
>>>>
>>>> I want to get
>>>> *JavaPairRDD<String, List>* In example:
>>>>
>>>>
>>>> *(1610, {a, b})*
>>>> *(1610, {c, d})*
>>>>
>>>> Is there a way to solve this problem?
>>>>
>>>> Best,
>>>> yasemin
>>>> --
>>>> hiç ender hiç
>>>>
>>>
>>>
>>
>
>
> --
> hiç ender hiç
>


Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-20 Thread Stéphane Verlet
I am not sure , I think it has to do with the signal sent to the process
and how the JVM handles it

Ctrl-C sends a a SIGINT vs a TERM signal for the kill command



On Fri, Nov 20, 2015 at 8:21 PM, Vikram Kone <vikramk...@gmail.com> wrote:

> Thanks for the info Stephane.
> Why does CTRL-C in the terminal running spark-submit kills the app in
> spark master correctly w/o any explicit shutdown hooks in the code? Can you
> explain why we need to add the shutdown hook to kill it when launched via a
> shell script ?
> For the second issue, I'm not using any thread pool. So not sure why
> killing the app in spark UI doesn't kill the process launched via script
>
>
> On Friday, November 20, 2015, Stéphane Verlet <kaweahsoluti...@gmail.com>
> wrote:
>
>> I solved the first issue by adding a shutdown hook in my code. The
>> shutdown hook get call when you exit your script (ctrl-C , kill … but nor
>> kill -9)
>>
>> val shutdownHook = scala.sys.addShutdownHook {
>> try {
>>
>> sparkContext.stop()
>> //Make sure to kill any other threads or thread pool you may be running
>>   }
>>   catch {
>> case e: Exception =>
>>   {
>> ...
>>
>>   }
>>   }
>>
>> }
>>
>> For the other issue , kill from the UI. I also had the issue. This was
>> caused by a thread pool that I use.
>>
>> So I surrounded my code with try/finally block to guarantee that the
>> thread pool was shutdown when spark stopped
>>
>> I hopes this help
>>
>> Stephane
>> ​
>>
>> On Fri, Nov 20, 2015 at 7:46 PM, Vikram Kone <vikramk...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I'm seeing a strange problem. I have a spark cluster in standalone mode.
>>> I submit spark jobs from a remote node as follows from the terminal
>>>
>>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>>> spark-jobs.jar
>>>
>>> when the app is running , when I press ctrl-C on the console terminal,
>>> then the process is killed and so is the app in the spark master UI. When I
>>> go to spark master ui, i see that this app is in state Killed under
>>> Completed applications, which is what I expected to see.
>>>
>>> Now, I created a shell script as follows to do the same
>>>
>>> #!/bin/bash
>>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>>> spark-jobs.jar
>>> echo $! > my.pid
>>>
>>> When I execute the shell script from terminal, as follows
>>>
>>> $> bash myscript.sh
>>>
>>> The application is submitted correctly to spark master and I can see it
>>> as one of the running apps in teh spark master ui. But when I kill the
>>> process in my terminal as follows
>>>
>>> $> ps kill $(cat my.pid)
>>>
>>> I see that the process is killed on my machine but the spark appliation
>>> is still running in spark master! It doesn't get killed.
>>>
>>> I noticed one more thing that, when I launch the spark job via shell
>>> script and kill the application from spark master UI by clicking on "kill"
>>> next to the running application, it gets killed in spark ui but I still see
>>> the process running in my machine.
>>>
>>> In both cases, I would expect the remote spark app to be killed and my
>>> local process to be killed.
>>>
>>> Why is this happening? and how can I kill a spark app from the terminal
>>> launced via shell script w.o going to the spark master UI?
>>>
>>> I want to launch the spark app via script and log the pid so i can
>>> monitor it remotely
>>>
>>> thanks for the help
>>>
>>>
>>


Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-20 Thread Stéphane Verlet
I solved the first issue by adding a shutdown hook in my code. The shutdown
hook get call when you exit your script (ctrl-C , kill … but nor kill -9)

val shutdownHook = scala.sys.addShutdownHook {
try {

sparkContext.stop()
//Make sure to kill any other threads or thread pool you may be running
  }
  catch {
case e: Exception =>
  {
...

  }
  }

}

For the other issue , kill from the UI. I also had the issue. This was
caused by a thread pool that I use.

So I surrounded my code with try/finally block to guarantee that the thread
pool was shutdown when spark stopped

I hopes this help

Stephane
​

On Fri, Nov 20, 2015 at 7:46 PM, Vikram Kone  wrote:

> Hi,
> I'm seeing a strange problem. I have a spark cluster in standalone mode. I
> submit spark jobs from a remote node as follows from the terminal
>
> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
> spark-jobs.jar
>
> when the app is running , when I press ctrl-C on the console terminal,
> then the process is killed and so is the app in the spark master UI. When I
> go to spark master ui, i see that this app is in state Killed under
> Completed applications, which is what I expected to see.
>
> Now, I created a shell script as follows to do the same
>
> #!/bin/bash
> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
> spark-jobs.jar
> echo $! > my.pid
>
> When I execute the shell script from terminal, as follows
>
> $> bash myscript.sh
>
> The application is submitted correctly to spark master and I can see it as
> one of the running apps in teh spark master ui. But when I kill the process
> in my terminal as follows
>
> $> ps kill $(cat my.pid)
>
> I see that the process is killed on my machine but the spark appliation is
> still running in spark master! It doesn't get killed.
>
> I noticed one more thing that, when I launch the spark job via shell
> script and kill the application from spark master UI by clicking on "kill"
> next to the running application, it gets killed in spark ui but I still see
> the process running in my machine.
>
> In both cases, I would expect the remote spark app to be killed and my
> local process to be killed.
>
> Why is this happening? and how can I kill a spark app from the terminal
> launced via shell script w.o going to the spark master UI?
>
> I want to launch the spark app via script and log the pid so i can monitor
> it remotely
>
> thanks for the help
>
>


Re: PairRDD from SQL

2015-11-04 Thread Stéphane Verlet
sqlContext.sql().map(row=> ((row.getString(0),
row.getString(1)),row.getInt(2)))

On Wed, Nov 4, 2015 at 1:44 PM, pratik khadloya  wrote:

> Hello,
>
> Is it possible to have a pair RDD from the below SQL query.
> The pair being ((item_id, flight_id), metric1)
>
> item_id, flight_id  are part of group by.
>
> SELECT
>   item_id,
>   flight_id,
>   SUM(metric1) AS metric1
> FROM mytable
> GROUP BY
>   item_id,
>   flight_id
>
>
> Thanks,
> Pratik
>


Re: Does Spark automatically run different stages concurrently when possible?

2015-01-10 Thread Stéphane Verlet
From your pseudo code,  it  would be sequential and done twice

1+2+3
then 1+2+4

If  you do a .cache() in step 2 then you would have 1+2+3 , then 4

I ran several steps in parrallel from the same program but never using the
same source  RDD so I do not know the limitations there. I simply started 2
threads and forced execution by calling first() to work around lazy/delay
execution.
I did not find in my case  that it was improving performance b/c the same
amount of total resources was used by Spark.

Stephane





On Sat, Jan 10, 2015 at 11:24 AM, YaoPau jonrgr...@gmail.com wrote:

 I'm looking for ways to reduce the runtime of my Spark job.  My code is a
 single file of scala code and is written in this order:

 (1) val lines = Import full dataset using sc.textFile
 (2) val ABonly = Parse out all rows that are not of type A or B
 (3) val processA = Process only the A rows from ABonly
 (4) val processB = Process only the B rows from ABonly

 Is Spark doing (1) then (2) then (3) then (4) ... or is it by default doing
 (1) then (2) then branching to both (3) and (4) simultaneously and running
 both in parallel?  If not, how can I make that happen?

 Jon





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SQL query in scala API

2014-12-04 Thread Stéphane Verlet
Disclaimer : I am new at Spark

I did something similar in a prototype which works but I that did not test
at scale yet

 val agg =3D users.mapValues(_ =3D 1)..aggregateByKey(new
CustomAggregation())(CustomAggregation.sequenceOp, CustomAggregation.comboO=
p)

class CustomAggregation() extends Serializable {
  var count =3D0: Long
  val users =3D Set(): Set[String]
}

object CustomAggregation {

  def sequenceOp(agg: CustomAggregation, user_id : String
): CustomAggregation =3D
{
  agg.count+=3D1;
  agg.users+=3Duser_id
  return agg;
}

  def comboOp(agg: CustomAggregation,
agg2: CustomAggregation): CustomAggregation =3D
{
  agg.count+=3D agg2.count
  agg.users++=3Dagg2.users
  return agg;
}

}


That should gives you the aggregation , distinct count is the size of users
set .

I hope this helps

Stephane

On Wed, Dec 3, 2014 at 5:47 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I'm wondering how to do this kind of SQL query with PairRDDFunctions.

 SELECT zip, COUNT(user), COUNT(DISTINCT user)
 FROM users
 GROUP BY zip

 In the Spark scala API, I can make an RDD (called users) of key-value
 pairs where the keys are zip (as in ZIP code) and the values are user id's.
 Then I can compute the count and distinct count like this:

 val count = users.mapValues(_ = 1).reduceByKey(_ + _)
 val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _)

 Then, if I want count and countDistinct in the same table, I have to join
 them on the key.

 Is there a way to do this without doing a join (and without using SQL or
 spark SQL)?

 Arun



Re: Spark 1.1.0 Can not read snappy compressed sequence file

2014-12-04 Thread Stéphane Verlet
Yes , It is working with this in spark-env.sh

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native
export
SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.0.4.1.jar

I tried so many things that I do not even know where I got this from :-(

Stephane


On Wed, Nov 26, 2014 at 8:08 AM, cjdc cristovao.corde...@cern.ch wrote:

 Hi,

 did you get a solution for this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Can-not-read-snappy-compressed-sequence-file-tp18394p19876.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark 1.1.0 Can not read snappy compressed sequence file

2014-11-07 Thread Stéphane Verlet
I first saw this using SparkSQL but the result is the same with plain
Spark.

14/11/07 19:46:36 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native
Method)
at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)

Full stack below 

I tried many different thing without luck
* extract the libsnappyjava.so from the Spark assembly and put it on
the library path
   * Added -Djava.library.path=... to  SPARK_MASTER_OPTS
and SPARK_WORKER_OPTS
   * added library path to SPARK_LIBRARY_PATH
   * added hadoop library path to SPARK_LIBRARY_PATH
* Rebuilt spark with different versions (previous and next)  of Snappy
(as seen when Google-ing)


Env :
   Centos 6.4
   Hadoop 2.3 (CDH5.1)
   Running in standalone/local mode


Any help would be appreciated

Thank you

Stephane


scala import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.BytesWritable

scala import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.NullWritable

scala var seq =
sc.sequenceFile[NullWritable,Text](/home/lfs/warehouse/base.db/mytable/event_date=2014-06-01/00_0).map(_._2.toString())
14/11/07 19:46:19 INFO MemoryStore: ensureFreeSpace(157973) called with
curMem=0, maxMem=278302556
14/11/07 19:46:19 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 154.3 KB, free 265.3 MB)
seq: org.apache.spark.rdd.RDD[String] = MappedRDD[2] at map at console:15

scala seq.collect().foreach(println)
14/11/07 19:46:35 INFO FileInputFormat: Total input paths to process : 1
14/11/07 19:46:35 INFO SparkContext: Starting job: collect at console:18
14/11/07 19:46:35 INFO DAGScheduler: Got job 0 (collect at console:18)
with 2 output partitions (allowLocal=false)
14/11/07 19:46:35 INFO DAGScheduler: Final stage: Stage 0(collect at
console:18)
14/11/07 19:46:35 INFO DAGScheduler: Parents of final stage: List()
14/11/07 19:46:35 INFO DAGScheduler: Missing parents: List()
14/11/07 19:46:35 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
map at console:15), which has no missing parents
14/11/07 19:46:35 INFO MemoryStore: ensureFreeSpace(2928) called with
curMem=157973, maxMem=278302556
14/11/07 19:46:35 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 2.9 KB, free 265.3 MB)
14/11/07 19:46:36 INFO DAGScheduler: Submitting 2 missing tasks from Stage
0 (MappedRDD[2] at map at console:15)
14/11/07 19:46:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/11/07 19:46:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, localhost, PROCESS_LOCAL, 1243 bytes)
14/11/07 19:46:36 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, localhost, PROCESS_LOCAL, 1243 bytes)
14/11/07 19:46:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/11/07 19:46:36 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
14/11/07 19:46:36 INFO HadoopRDD: Input split:
file:/home/lfs/warehouse/base.db/mytable/event_date=2014-06-01/00_0:6504064+6504065
14/11/07 19:46:36 INFO HadoopRDD: Input split:
file:/home/lfs/warehouse/base.db/mytable/event_date=2014-06-01/00_0:0+6504064
14/11/07 19:46:36 INFO deprecation: mapred.tip.id is deprecated. Instead,
use mapreduce.task.id
14/11/07 19:46:36 INFO deprecation: mapred.task.is.map is deprecated.
Instead, use mapreduce.task.ismap
14/11/07 19:46:36 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
14/11/07 19:46:36 INFO deprecation: mapred.job.id is deprecated. Instead,
use mapreduce.job.id
14/11/07 19:46:36 INFO deprecation: mapred.task.id is deprecated. Instead,
use mapreduce.task.attempt.id
14/11/07 19:46:36 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native
Method)
at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
at
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:190)
at
org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1915)
at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1759)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1773)
at
org.apache.hadoop.mapred.SequenceFileRecordReader.init(SequenceFileRecordReader.java:49)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64)
at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:197)
at