Re: org.apache.spark.SparkException: Task not serializable

2017-03-10 Thread ??????????
hi mina,

can you paste your new code here pleasel
i meet this issue too but do not get Ankur's idea.

thanks 
Robin

---Original---
From: "Mina Aslani"
Date: 2017/3/7 05:32:10
To: "Ankur Srivastava";
Cc: "user@spark.apache.org";
Subject: Re: org.apache.spark.SparkException: Task not serializable


Thank you Ankur for the quick response, really appreciate it! Making the class 
serializable resolved the exception!

Best regards,Mina


On Mon, Mar 6, 2017 at 4:20 PM, Ankur Srivastava  
wrote:
The fix for this make your class Serializable. The reason being the closures 
you have defined in the class need to be serialized and copied over to all 
executor nodes.

Hope this helps.


Thanks
Ankur


On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani  wrote:
Hi,I am trying to start with spark and get number of lines of a text file in my 
mac, however I get  org.apache.spark.SparkException: Task not serializable 
error on JavaRDD logData = javaCtx.textFile(file);Please see below for 
the sample of code and the stackTrace.Any idea why this error is thrown?Best 
regards,MinaSystem.out.println("Creating Spark Configuration");
SparkConf javaConf = new SparkConf();
javaConf.setAppName("My First Spark Java Application");
javaConf.setMaster("PATH to my spark");
System.out.println("Creating Spark Context");
JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
System.out.println("Loading the Dataset and will further process it");
String file = "file:///file.txt";
JavaRDD logData = javaCtx.textFile(file);
long numLines = logData.filter(new Function() {
   public Boolean call(String s) {
  return true;
   }
}).count();

System.out.println("Number of Lines in the Dataset "+numLines);

javaCtx.close(); Exception in thread "main" org.apache.spark.SparkException: 
Task not serializable at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
 at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at 
org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at 
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387) at 
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at 
org.apache.spark.rdd.RDD.filter(RDD.scala:386) at 
org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)

Re: question on Write Ahead Log (Spark Streaming )

2017-03-10 Thread Dibyendu Bhattacharya
Hi,

You could also use this Receiver :
https://github.com/dibbhatt/kafka-spark-consumer

This is part of spark-packages also :
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You do not need to enable WAL in this and still recover from Driver failure
with no data loss. You can refer to
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for
more details or can reach out to me.

Regards,
Dibyendu


On Wed, Mar 8, 2017 at 8:58 AM, kant kodali  wrote:

> Hi All,
>
> I am using a Receiver based approach. And I understand that spark
> streaming API's will convert the received data from receiver into blocks
> and these blocks that are in memory are also stored in WAL if one enables
> it. my upstream source which is not Kafka can also replay by which I mean
> if I don't send an ack to my upstream source it will resend it so I don't
> have to write the received data to WAL however I still need to enable WAL
> correct? because there are blocks that are in memory that needs to written
> to WAL so they can be recovered later.
>
> Thanks,
> kant
>


How to improve performance of saveAsTextFile()

2017-03-10 Thread Parsian, Mahmoud
How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
This is taking over 30 minutes on a cluster of 10 nodes.
Running Spark on YARN.

JavaRDD has 120 million entries.

Thank you,
Best regards,
Mahmoud


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it.

On Mar 10, 2017 10:03 AM, "shyla deshpande" 
wrote:

> I have a spark streaming application which processes 3 kafka streams and
> has 5 output operations.
>
> Not sure what should be the setting for spark.streaming.concurrentJobs.
>
> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
> will be run sequentially?
>
> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
> this situation?
>
> I appreciate your input. Thanks
>


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Justin Miller
I've created a ticket here: https://issues.apache.org/jira/browse/SPARK-19888 


Thanks,
Justin

> On Mar 10, 2017, at 1:14 PM, Michael Armbrust  wrote:
> 
> If you have a reproduction you should open a JIRA.  It would be great if 
> there is a fix.  I'm just saying I know a similar issue does not exist in 
> structured streaming.
> 
> On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller  > wrote:
> Hi Michael,
> 
> I'm experiencing a similar issue. Will this not be fixed in Spark Streaming?
> 
> Best,
> Justin
> 
>> On Mar 10, 2017, at 8:34 AM, Michael Armbrust > > wrote:
>> 
>> One option here would be to try Structured Streaming.  We've added an option 
>> "failOnDataLoss" that will cause Spark to just skip a head when this 
>> exception is encountered (its off by default though so you don't silently 
>> miss data).
>> 
>> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman 
>> > wrote:
>> I am using Spark streaming and reading data from Kafka using
>> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> smallest.
>> 
>> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
>> and my spark job crashes.
>> 
>> I want to understand if there is a graceful way to handle this failure and
>> not kill the job. I want to keep ignoring these exceptions, as some other
>> partitions are fine and I am okay with data loss.
>> 
>> Is there any way to handle this and not have my spark job crash? I have no
>> option of increasing the kafka retention period.
>> 
>> I tried to have the DStream returned by createDirectStream() wrapped in a
>> Try construct, but since the exception happens in the executor, the Try
>> construct didn't take effect. Do you have any ideas of how to handle this?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
>>  
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> .
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 
> 



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
If you have a reproduction you should open a JIRA.  It would be great if
there is a fix.  I'm just saying I know a similar issue does not exist in
structured streaming.

On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller <
justin.mil...@protectwise.com> wrote:

> Hi Michael,
>
> I'm experiencing a similar issue. Will this not be fixed in Spark
> Streaming?
>
> Best,
> Justin
>
> On Mar 10, 2017, at 8:34 AM, Michael Armbrust 
> wrote:
>
> One option here would be to try Structured Streaming.  We've added an
> option "failOnDataLoss" that will cause Spark to just skip a head when this
> exception is encountered (its off by default though so you don't silently
> miss data).
>
> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
> ram.the.m...@gmail.com> wrote:
>
>> I am using Spark streaming and reading data from Kafka using
>> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> smallest.
>>
>> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeE
>> xception
>> and my spark job crashes.
>>
>> I want to understand if there is a graceful way to handle this failure and
>> not kill the job. I want to keep ignoring these exceptions, as some other
>> partitions are fine and I am okay with data loss.
>>
>> Is there any way to handle this and not have my spark job crash? I have no
>> option of increasing the kafka retention period.
>>
>> I tried to have the DStream returned by createDirectStream() wrapped in a
>> Try construct, but since the exception happens in the executor, the Try
>> construct didn't take effect. Do you have any ideas of how to handle this?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetO
>> utOfRangeException-tp26534.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: Apparent memory leak involving count

2017-03-10 Thread Facundo Domínguez
> You seem to generate always a new rdd instead of reusing the existing. So I 
> does not seem surprising that the memory need is growing.

Thanks for talking a look. Unfortunately, memory grows regardless of
whether just one RDD is used or one per iteration.

> This is how it can display the history in the UI. It's normal for the 
> bookkeeping to keep growing because it's recording every job. You can 
> configure it to keep records about fewer jobs. But thousands of entries isn't 
> exactly big.

This was it. Changing all the retention limits to 0 did the job.
I agree that it isn't big. But it adds noise when trying to determine
if an application runs in bounded space or not. So I'm happy that it
can be quietened.

Thank you.
Facundo


On Thu, Mar 9, 2017 at 11:46 AM, Sean Owen  wrote:
> The driver keeps metrics on everything that has executed. This is how it can
> display the history in the UI. It's normal for the bookkeeping to keep
> growing because it's recording every job. You can configure it to keep
> records about fewer jobs. But thousands of entries isn't exactly big.
>
> On Thu, Mar 9, 2017 at 2:24 PM Facundo Domínguez 
> wrote:
>>
>> Hello,
>>
>> Some heap profiling shows that memory grows under a TaskMetrics class.
>> Thousands of live hashmap entries are accumulated.
>> Would it be possible to disable collection of metrics? I've been
>> looking for settings to disable it but nothing relevant seems to come
>> up.
>>
>> Thanks,
>> Facundo
>>
>> On Wed, Mar 8, 2017 at 2:02 PM, Facundo Domínguez
>>  wrote:
>> > Hello,
>> >
>> > I'm running JavaRDD.count() repeteadly on a small RDD, and it seems to
>> > increase the size of the Java heap over time until the default limit
>> > is reached and an OutOfMemoryException is thrown. I'd expect this
>> > program to run in constant space, and the problem carries over to some
>> > more complicated tests I need to get working.
>> >
>> > My spark version is 2.1.0 and I'm running this using nix in debian
>> > jessie.
>> >
>> > Is there anything elemental that I could do to keep memory bounded?
>> >
>> > I'm copying the program below and an example of the output.
>> >
>> > Thanks in advance,
>> > Facundo
>> >
>> > /* Leak.java */
>> > import java.util.*;
>> > import java.nio.charset.StandardCharsets;
>> > import java.nio.file.Files;
>> > import java.nio.file.Paths;
>> > import java.io.IOException;
>> > import java.io.Serializable;
>> > import org.apache.spark.api.java.*;
>> > import org.apache.spark.SparkConf;
>> > import org.apache.spark.api.java.function.*;
>> > import org.apache.spark.sql.*;
>> >
>> > public class Leak {
>> >
>> >   public static void main(String[] args) throws IOException {
>> >
>> > SparkConf conf = new SparkConf().setAppName("Leak");
>> > JavaSparkContext sc = new JavaSparkContext(conf);
>> > SQLContext sqlc = new SQLContext(sc);
>> >
>> > for(int i=0;i<50;i++) {
>> >   System.gc();
>> >   long mem = Runtime.getRuntime().totalMemory();
>> >   System.out.println("java total memory: " + mem);
>> >   for(String s :
>> > Files.readAllLines(Paths.get("/proc/self/status"),
>> > StandardCharsets.UTF_8)) {
>> >   if (0 <= s.indexOf("VmRSS"))
>> > System.out.println(s);
>> >   }
>> >   for(int j=0;j<2999;j++) {
>> > JavaRDD rdd =
>> > sc.parallelize(Arrays.asList(1.0,2.0,3.0));
>> > rdd.count();
>> >   }
>> > }
>> > sc.stop();
>> >   }
>> > }
>> >
>> > # example output
>> > $ spark-submit --master local[1] --class Leak leak/build/libs/leak.jar
>> > 17/03/08 11:26:37 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where
>> > applicable
>> > 17/03/08 11:26:37 WARN Utils: Your hostname, fd-tweag resolves to a
>> > loopback address: 127.0.0.1; using 192.168.1.42 instead (on interface
>> > wlan0)
>> > 17/03/08 11:26:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
>> > to another address
>> > java total memory: 211288064
>> > VmRSS:  200488 kB
>> > java total memory: 456654848
>> > VmRSS:  656472 kB
>> > java total memory: 562036736
>> > VmRSS:  677156 kB
>> > java total memory: 562561024
>> > VmRSS:  689424 kB
>> > java total memory: 562561024
>> > VmRSS:  701760 kB
>> > java total memory: 562561024
>> > VmRSS:  732540 kB
>> > java total memory: 562561024
>> > VmRSS:  748468 kB
>> > java total memory: 562036736
>> > VmRSS:  770680 kB
>> > java total memory: 705691648
>> > VmRSS:  789632 kB
>> > java total memory: 706740224
>> > VmRSS:  802720 kB
>> > java total memory: 704118784
>> > VmRSS:  832740 kB
>> > java total memory: 705691648
>> > VmRSS:  850808 kB
>> > java total memory: 704118784
>> > VmRSS:  875232 kB
>> > java total memory: 705691648
>> > VmRSS:  898716 kB
>> > java total memory: 701497344
>> > VmRSS:  919388 kB
>> > java total memory: 905445376
>> > VmRSS:  942628 kB
>> > java total memory: 904921088
>> > 

spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread shyla deshpande
I have a spark streaming application which processes 3 kafka streams and
has 5 output operations.

Not sure what should be the setting for spark.streaming.concurrentJobs.

1. If the concurrentJobs setting is 4 does that mean 2 output operations
will be run sequentially?

2. If I had 6 cores what would be a ideal setting for concurrentJobs in
this situation?

I appreciate your input. Thanks


Re: can spark take advantage of ordered data?

2017-03-10 Thread Jonathan Coveney
While I was at Two Sigma I ended up implementing something similar to what
Koert described... you can check it out here:
https://github.com/twosigma/flint/blob/master/src/main/scala/com/twosigma/flint/rdd/OrderedRDD.scala.
They've built a lot more on top of this (including support for dataframes
etc).

2017-03-10 9:45 GMT-05:00 Koert Kuipers :

> this shouldn't be too hard. adding something to spark-sorted or to the
> dataframe/dataset logical plan that says "trust me, i am already
> partitioned and sorted" seems doable. however you most likely need a custom
> hash partitioner, and you have to be careful to read the data in without
> file splitting.
>
> On Mar 10, 2017 9:10 AM, "sourabh chaki"  wrote:
>
>> My use case is also quite similar. I have 2 feeds. One 3TB and another
>> 100GB. Both the feeds are generated by hadoop reduce operation and
>> partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
>> 100GB file has 200 partitions.
>>
>> Now when I do a join between these two feeds using spark, spark shuffles
>> both the RDDS and it takes long time to complete. Can we do something so
>> that spark can recognise the existing partitions of 3TB feed and shuffles
>> only 200GB feed?
>> It can be mapside scan for bigger RDD and shuffle read from smaller RDD?
>>
>> I have looked at spark-sorted project, but that project does not utilise
>> the pre-existing partitions in the feed.
>> Any pointer will be helpful.
>>
>> Thanks
>> Sourabh
>>
>> On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
>> wrote:
>>
>>> Hi Jonathan,
>>>
>>> you might be interested in https://issues.apache.org/j
>>> ira/browse/SPARK-3655 (not yet available) and https://github.com/tresata
>>> /spark-sorted (not part of spark, but it is available right now).
>>> Hopefully thats what you are looking for.  To the best of my knowledge that
>>> covers what is available now / what is being worked on.
>>>
>>> Imran
>>>
>>> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
>>> wrote:
>>>
 Hello all,

 I am wondering if spark already has support for optimizations on sorted
 data and/or if such support could be added (I am comfortable dropping to a
 lower level if necessary to implement this, but I'm not sure if it is
 possible at all).

 Context: we have a number of data sets which are essentially already
 sorted on a key. With our current systems, we can take advantage of this to
 do a lot of analysis in a very efficient fashion...merges and joins, for
 example, can be done very efficiently, as can folds on a secondary key and
 so on.

 I was wondering if spark would be a fit for implementing these sorts of
 optimizations? Obviously it is sort of a niche case, but would this be
 achievable? Any pointers on where I should look?

>>>
>>>
>>


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Justin Miller
Hi Michael,

I'm experiencing a similar issue. Will this not be fixed in Spark Streaming?

Best,
Justin

> On Mar 10, 2017, at 8:34 AM, Michael Armbrust  wrote:
> 
> One option here would be to try Structured Streaming.  We've added an option 
> "failOnDataLoss" that will cause Spark to just skip a head when this 
> exception is encountered (its off by default though so you don't silently 
> miss data).
> 
> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman 
> > wrote:
> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
> 
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
> 
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
> 
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
> 
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: can spark take advantage of ordered data?

2017-03-10 Thread Yong Zhang
I think it is an interesting requirement, but I am not familiar with Spark 
enough to say it can be done as latest spark version or not.


>From my understanding, you are looking for some API from the spark to read the 
>source directly into a ShuffledRDD, which indeed needs (K, V and a Partitioner 
>instance).


I don't think Spark provides that directly as now, but in your case, it makes 
sense to create a JIRA for spark to support in the future.


For right now, maybe there are ways to use Spark developerAPI to do what you 
need, and I will leave that to other Spark expert to confirm.


Yong



From: sourabh chaki 
Sent: Friday, March 10, 2017 9:03 AM
To: Imran Rashid
Cc: Jonathan Coveney; user@spark.apache.org
Subject: Re: can spark take advantage of ordered data?

My use case is also quite similar. I have 2 feeds. One 3TB and another 100GB. 
Both the feeds are generated by hadoop reduce operation and partitioned by 
hadoop hashpartitioner. 3TB feed has 10K partitions whereas 100GB file has 200 
partitions.

Now when I do a join between these two feeds using spark, spark shuffles both 
the RDDS and it takes long time to complete. Can we do something so that spark 
can recognise the existing partitions of 3TB feed and shuffles only 200GB feed?
It can be mapside scan for bigger RDD and shuffle read from smaller RDD?

I have looked at spark-sorted project, but that project does not utilise the 
pre-existing partitions in the feed.
Any pointer will be helpful.

Thanks
Sourabh

On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
> wrote:
Hi Jonathan,

you might be interested in https://issues.apache.org/jira/browse/SPARK-3655 
(not yet available) and https://github.com/tresata/spark-sorted (not part of 
spark, but it is available right now).  Hopefully thats what you are looking 
for.  To the best of my knowledge that covers what is available now / what is 
being worked on.

Imran

On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
> wrote:
Hello all,

I am wondering if spark already has support for optimizations on sorted data 
and/or if such support could be added (I am comfortable dropping to a lower 
level if necessary to implement this, but I'm not sure if it is possible at 
all).

Context: we have a number of data sets which are essentially already sorted on 
a key. With our current systems, we can take advantage of this to do a lot of 
analysis in a very efficient fashion...merges and joins, for example, can be 
done very efficiently, as can folds on a secondary key and so on.

I was wondering if spark would be a fit for implementing these sorts of 
optimizations? Obviously it is sort of a niche case, but would this be 
achievable? Any pointers on where I should look?




Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
One option here would be to try Structured Streaming.  We've added an
option "failOnDataLoss" that will cause Spark to just skip a head when this
exception is encountered (its off by default though so you don't silently
miss data).

On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
>
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
>
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
>
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
>
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-
> OffsetOutOfRangeException-tp26534.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: can spark take advantage of ordered data?

2017-03-10 Thread Koert Kuipers
this shouldn't be too hard. adding something to spark-sorted or to the
dataframe/dataset logical plan that says "trust me, i am already
partitioned and sorted" seems doable. however you most likely need a custom
hash partitioner, and you have to be careful to read the data in without
file splitting.

On Mar 10, 2017 9:10 AM, "sourabh chaki"  wrote:

> My use case is also quite similar. I have 2 feeds. One 3TB and another
> 100GB. Both the feeds are generated by hadoop reduce operation and
> partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
> 100GB file has 200 partitions.
>
> Now when I do a join between these two feeds using spark, spark shuffles
> both the RDDS and it takes long time to complete. Can we do something so
> that spark can recognise the existing partitions of 3TB feed and shuffles
> only 200GB feed?
> It can be mapside scan for bigger RDD and shuffle read from smaller RDD?
>
> I have looked at spark-sorted project, but that project does not utilise
> the pre-existing partitions in the feed.
> Any pointer will be helpful.
>
> Thanks
> Sourabh
>
> On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
> wrote:
>
>> Hi Jonathan,
>>
>> you might be interested in https://issues.apache.org/j
>> ira/browse/SPARK-3655 (not yet available) and https://github.com/tresata
>> /spark-sorted (not part of spark, but it is available right now).
>> Hopefully thats what you are looking for.  To the best of my knowledge that
>> covers what is available now / what is being worked on.
>>
>> Imran
>>
>> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
>> wrote:
>>
>>> Hello all,
>>>
>>> I am wondering if spark already has support for optimizations on sorted
>>> data and/or if such support could be added (I am comfortable dropping to a
>>> lower level if necessary to implement this, but I'm not sure if it is
>>> possible at all).
>>>
>>> Context: we have a number of data sets which are essentially already
>>> sorted on a key. With our current systems, we can take advantage of this to
>>> do a lot of analysis in a very efficient fashion...merges and joins, for
>>> example, can be done very efficiently, as can folds on a secondary key and
>>> so on.
>>>
>>> I was wondering if spark would be a fit for implementing these sorts of
>>> optimizations? Obviously it is sort of a niche case, but would this be
>>> achievable? Any pointers on where I should look?
>>>
>>
>>
>


Re: can spark take advantage of ordered data?

2017-03-10 Thread sourabh chaki
My use case is also quite similar. I have 2 feeds. One 3TB and another
100GB. Both the feeds are generated by hadoop reduce operation and
partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
100GB file has 200 partitions.

Now when I do a join between these two feeds using spark, spark shuffles
both the RDDS and it takes long time to complete. Can we do something so
that spark can recognise the existing partitions of 3TB feed and shuffles
only 200GB feed?
It can be mapside scan for bigger RDD and shuffle read from smaller RDD?

I have looked at spark-sorted project, but that project does not utilise
the pre-existing partitions in the feed.
Any pointer will be helpful.

Thanks
Sourabh

On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid  wrote:

> Hi Jonathan,
>
> you might be interested in https://issues.apache.org/
> jira/browse/SPARK-3655 (not yet available) and https://github.com/
> tresata/spark-sorted (not part of spark, but it is available right now).
> Hopefully thats what you are looking for.  To the best of my knowledge that
> covers what is available now / what is being worked on.
>
> Imran
>
> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
> wrote:
>
>> Hello all,
>>
>> I am wondering if spark already has support for optimizations on sorted
>> data and/or if such support could be added (I am comfortable dropping to a
>> lower level if necessary to implement this, but I'm not sure if it is
>> possible at all).
>>
>> Context: we have a number of data sets which are essentially already
>> sorted on a key. With our current systems, we can take advantage of this to
>> do a lot of analysis in a very efficient fashion...merges and joins, for
>> example, can be done very efficiently, as can folds on a secondary key and
>> so on.
>>
>> I was wondering if spark would be a fit for implementing these sorts of
>> optimizations? Obviously it is sort of a niche case, but would this be
>> achievable? Any pointers on where I should look?
>>
>
>


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread vaquar khan
Please read Spark documents at least once before asking question.

http://spark.apache.org/docs/latest/streaming-programming-guide.html

http://2s7gjr373w3x22jf92z99mgm5w-wpengine.netdna-ssl.com/wp-content/uploads/2015/11/spark-streaming-datanami.png


Regards,
Vaquar khan


On Fri, Mar 10, 2017 at 6:17 AM, Sean Owen  wrote:

> Kafka and Spark Streaming don't do the same thing. Kafka stores and
> transports data, Spark Streaming runs computations on a stream of data.
> Neither is itself a streaming platform in its entirety.
>
> It's kind of like asking whether you should build a website using just
> MySQL, or nginx.
>
>
>> On 9 Mar 2017, at 20:37, Gaurav1809  wrote:
>>
>> Hi All, Would you please let me know which streaming platform is best. Be
>> it
>> server log processing, social media feeds ot any such streaming data. I
>> want
>> to know the comparison between Kafka & Spark Streaming.
>>
>>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: Question on Spark's graph libraries

2017-03-10 Thread Md. Rezaul Karim
+1

Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 10 March 2017 at 12:10, Robin East  wrote:

> I would love to know the answer to that too.
> 
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 9 Mar 2017, at 17:42, enzo  wrote:
>
> I am a bit confused by the current roadmap for graph and graph analytics
> in Apache Spark.
>
> I understand that we have had for some time two libraries (the following
> is my understanding - please amend as appropriate!):
>
> . GraphX, part of Spark project.  This library is based on RDD and it is
> only accessible via Scala.  It doesn’t look that this library has been
> enhanced recently.
> . GraphFrames, independent (at the moment?) library for Spark.  This
> library is based on Spark DataFrames and accessible by Scala & Python. Last
> commit on GitHub was 2 months ago.
>
> GraphFrames cam about with the promise at some point to be integrated in
> Apache Spark.
>
> I can see other projects coming up with interesting libraries and ideas
> (e.g. Graphulo on Accumulo, a new project with the goal of implementing
> the GraphBlas building blocks for graph algorithms on top of Accumulo).
>
> Where is Apache Spark going?
>
> Where are graph libraries in the roadmap?
>
>
>
> Thanks for any clarity brought to this matter.
>
> Enzo
>
>
>


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread Sean Owen
Kafka and Spark Streaming don't do the same thing. Kafka stores and
transports data, Spark Streaming runs computations on a stream of data.
Neither is itself a streaming platform in its entirety.

It's kind of like asking whether you should build a website using just
MySQL, or nginx.


> On 9 Mar 2017, at 20:37, Gaurav1809  wrote:
>
> Hi All, Would you please let me know which streaming platform is best. Be
> it
> server log processing, social media feeds ot any such streaming data. I
> want
> to know the comparison between Kafka & Spark Streaming.
>
>


Re: Question on Spark's graph libraries

2017-03-10 Thread Robin East
I would love to know the answer to that too.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 9 Mar 2017, at 17:42, enzo  wrote:
> 
> I am a bit confused by the current roadmap for graph and graph analytics in 
> Apache Spark.
> 
> I understand that we have had for some time two libraries (the following is 
> my understanding - please amend as appropriate!):
> 
> . GraphX, part of Spark project.  This library is based on RDD and it is only 
> accessible via Scala.  It doesn’t look that this library has been enhanced 
> recently.
> . GraphFrames, independent (at the moment?) library for Spark.  This library 
> is based on Spark DataFrames and accessible by Scala & Python. Last commit on 
> GitHub was 2 months ago.
> 
> GraphFrames cam about with the promise at some point to be integrated in 
> Apache Spark.
> 
> I can see other projects coming up with interesting libraries and ideas (e.g. 
> Graphulo on Accumulo, a new project with the goal of implementing the 
> GraphBlas building blocks for graph algorithms on top of Accumulo).
> 
> Where is Apache Spark going?
> 
> Where are graph libraries in the roadmap?
> 
> 
> 
> Thanks for any clarity brought to this matter.
> 
> Enzo



Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread Robin East
As Jorn says there is no best. I would start with 
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101. This will 
help you form some meaningful questions about what tools suit which use cases. 
Most places have a selection of tools such as spark, kafka, flink, storm, flume 
and so on. 
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 9 Mar 2017, at 20:04, Jörn Franke  wrote:
> 
> I find this question strange. There is no best tool for every use case. For 
> example, both tools mentioned below are suitable for different purposes, 
> sometimes also complementary.
> 
>> On 9 Mar 2017, at 20:37, Gaurav1809  wrote:
>> 
>> Hi All, Would you please let me know which streaming platform is best. Be it
>> server log processing, social media feeds ot any such streaming data. I want
>> to know the comparison between Kafka & Spark Streaming.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Which-streaming-platform-is-best-Kafka-or-Spark-Streaming-tp28474.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



[Spark Streaming][Spark SQL] Design suggestions needed for sessionization

2017-03-10 Thread Ramkumar Venkataraman
At high-level, I am looking to do sessionization. I want to combine events
based on some key, do some transformations and emit data to HDFS. The catch
is there are time boundaries, say, I group events in a window of 0.5 hours,
based on some timestamp key in the event. Typical event-time windowing +
key-based grouping stuff.

I have been trying to figure out ways to do it. 

The following best approaches are ruled-out:

1) Use event-time windows with watermarks and possibility of updating
previous windows on late data arrival. But this is possible in spark 2, but
it is only in alpha. Also the company I work for, doesnt support spark 2
yet. 
2) Use mapWithState in spark 1.6, but we can't do event-time windows if I am
not mistaken. 

Other feasible approaches:

3) Use another data-store like HBase to store unfinished sessions. Every
window needs to find out which session a particular event will fit in by
doing a query on HBase. Least favored option, since we have to maintain
another component operationally.
4) Use HDFS to store unfinished sessions. At every window, we need to create
a DF on the unfinished sessions, join it with the current DStream, do
transformations, emit a tuple stream of finished and unfinished sessions and
write them to HDFS. 

Option #4 looks elegant, but the catch is the write of unfinished sessions.
We read and write from the same HDFS location, the write also needs to do a
SaveMode.Overwrite. I am seeing concurrency problems when the next window of
read DF doesn't find the files in HDFS (because they are getting overwritten
by the write from previous window).

/Caused by: java.io.FileNotFoundException: File does not exist:
hdfs://horton/tmp/inprogress-events/part-r-0-a22e7b14-3207-4fb3-8db8-f5423ef0441d.gz.parquet
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1374)/

So the questions I have:
1) Is option #4 of reading and writing DF to the same HDFS location (same
table basically) the right approach? If not, any alternatives?
2) I had fiddled with writing the DF at the end of a window to a temp
location and move the files from temp to the expected folder (using HDFS
utils) at the beginning of next window. Not much help.
3) Is there a way to make sure the write at the end of every window to block
the processing of next window? I tried to force creation of a new stage by
using coalesce, but not much help.
4) Is there any other totally different approach that is possible? We know
option #3 works, but we dont want to maintain any other component
operationally.

Let me know your thoughts or if need any more information. Any existing
pointers or SO answers on how people do sessionization in spark 1.6 would
also help (couldn't find anything that helped me)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Spark-SQL-Design-suggestions-needed-for-sessionization-tp28480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Ramkumar Venkataraman
Nope, but when we migrated to spark 1.6, we didnt see the errors yet. Not
sure if they fixed in between releases or it just be a weird timing thing
that we havent discovered yet in 1.6 as well.

On Sat, Mar 4, 2017 at 12:00 AM, nimmi.cv [via Apache Spark User List] <
ml-node+s1001560n28454...@n3.nabble.com> wrote:

> Did you find out how ?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-
> gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534p28454.html
> To unsubscribe from How to gracefully handle Kafka
> OffsetOutOfRangeException, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534p28479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How can an RDD make its every elements to a new RDD ?

2017-03-10 Thread Mars Xu
Hi users,

I’m fresh to RDD Programming, my problem as the title, what I do is to 
read a source file through sc, then do a groupByKey get new RDD,now I want to 
do the other groupByKey based on the former RDD’s every element.

   for example,my source file as follow:
hello_world,1
hello,1
hello_world_spark,3
hello_scala,4
spark_rdd,1
spark_rdd_program,1
spark,1
spark_sql,3
   

  after my first round groupbykey, I get an RDD like this:
   hello,((world,1),(world_spark,3),(scala,4))
   spark,((rdd,1),(rdd_program,1),(sql,3))

  
  the next step is what my problem ,I want to groupbykey on the values’ 
content like “world/rdd/scala/sql”, it seems I need group by every element’s 
value,but spark does not support nested RDDs, so what can I do to solve it ?

  actually, what I do is to building a tree, every node is a word in a 
sentence,the root node is null, in my example , two children of root node is 
“hello” and “spark”,and hello also has 2 children(world and scala), spark also 
has two children(rdd and sql)

For Help Please. 
Thanks every every every much.

Mars.
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Case class with POJO - encoder issues

2017-03-10 Thread geoHeil
http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset
describes the Problem. Actually, I have the same Problem. Is there a simple
way to build such an Encoder which serializes into multiple fields? I would
not want to replicate the Whole JTS geometry class hierarchy only for spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381p28478.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org