a way to allow spark job to continue despite task failures?

2015-11-13 Thread Nicolae Marasoiu
Hi,


I know a task can fail 2 times and only the 3rd breaks the entire job.

I am good with this number of attempts.

I would like that after trying a task 3 times, it continues with the other 
tasks.

The job can be "failed", but I want all tasks run.

Please see my use case.


I read a hadoop input set, and some gzip files are incomplete. I would like to 
just skip them and the only way I see is to tell Spark to ignore some tasks 
permanently failing, if it is possible. With traditional hadoop map-reduce this 
was possible using mapred.max.map.failures.percent.


Do map-reduce params like mapred.max.map.failures.percent apply to Spark/YARN 
map-reduce jobs ?


I edited $HADOOP_CONF_DIR/mapred-site.xml and added 
mapred.max.map.failures.percent=30 but does not seem to apply, job still failed 
after 3 task attempt fails.


Should Spark transmit this parameter? Or the mapred.* do not apply?

Do other hadoop parameters (e.g. the ones involved in the input reading, not in 
the "processing" or "application" like this max.map.failures) - are others 
taken into account and transmitted? I saw that it should scan HADOOP_CONF_DIR 
and forward those, but I guess this does not apply to any parameter, since 
Spark has its own distribution & DAG stages processing logic, which just 
happens to have a YARN implementation.


Do you know a way to do this in Spark - to ignore a predefined number of tasks 
fail, but allow the job to continue? This way I could see all the faulty input 
files in one job run, delete them all and continue with the rest.


Just to mention, doing a manual gzip -t on top of hadoop cat is infeasible and 
map-reduce is way faster to scan the 15K files worth 70GB (its doing 25M/s per 
node), while the old style hadoop cat is doing much less.


Thanks,

Nicu


Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-02 Thread Nicolae Marasoiu
Hi,


Set 10ms and spark.streaming.backpressure.enabled=true


This should automatically delay the next batch until the current one is 
processed, or at least create that balance over a few batches/periods between 
the consume/process rate vs ingestion rate.


Nicu


From: Cody Koeninger 
Sent: Thursday, October 1, 2015 11:46 PM
To: Sourabh Chandak
Cc: user
Subject: Re: spark.streaming.kafka.maxRatePerPartition for direct stream

That depends on your job, your cluster resources, the number of seconds per 
batch...

You'll need to do some empirical work to figure out how many messages per batch 
a given executor can handle.  Divide that by the number of seconds per batch.



On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
> wrote:
Hi,

I am writing a spark streaming job using the direct stream method for kafka and 
wanted to handle the case of checkpoint failure when we'll have to reprocess 
the entire data from starting. By default for every new checkpoint it tries to 
load everything from each partition and that takes a lot of time for 
processing. After some searching found out that there exists a config 
spark.streaming.kafka.maxRatePerPartition which can be used to tackle this. My 
question is what will be a suitable range for this config if we have ~12 
million messages in kafka with maximum message size ~10 MB.

Thanks,
Sourabh



Re: How to save DataFrame as a Table in Hbase?

2015-10-02 Thread Nicolae Marasoiu
Hi,

Phoenix, an SQL coprocessor for HBase has ingestion integration with dataframes 
in 4.x version.
For HBase and RDD in general there are multiple solutions: hbase-spark module 
by Cloudera, which wil be part of a future HBase release, hbase-rdd by 
unicredit, and many others.
I am not sure if the fact that an RDD is dataframe or normal RDD is relevant 
for storage. I think main advantage of dataframe is economical memory usage and 
efficient scans thru the data in memory and processing in general but when 
mapping to outside schema, you have the same data to map to a schema specific 
to external db. For instance saving granular values in separate columns or 
bundling them together in arrays of concatenated values is a choice that seems 
to be independent of how the rdd is on the spark side - normal rdd or data 
frames, but more like a storage tradeoff between space & speed for various use 
cases (data access patterns).

Nicu

From: unk1102 
Sent: Friday, October 2, 2015 1:15 AM
To: user@spark.apache.org
Subject: How to save DataFrame as a Table in Hbase?

Hi anybody tried to save DataFrame in HBase? I have processed data in
DataFrame which I need to store in HBase so that my web ui can access it
from Hbase? Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-DataFrame-as-a-Table-in-Hbase-tp24903.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Direct Stream

2015-10-01 Thread Nicolae Marasoiu
Hi,


If you just need processing per topic, why not generate N different kafka 
direct streams ? when creating a kafka direct stream you have list of topics - 
just give one.


Then the reusable part of your computations should be extractable as 
transformations/functions and reused between the streams.


Nicu



From: Adrian Tanase 
Sent: Thursday, October 1, 2015 5:47 PM
To: Cody Koeninger; Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody's tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
[http://spark.apache.org/docs/latest/img/spark-logo-hd.png]

Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...




On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



Re: Problem understanding spark word count execution

2015-10-01 Thread Nicolae Marasoiu
Hi,

So you say " sc.textFile -> flatMap -> Map".

My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).

Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the partitions: for instance 
in HDFS they would do an open and a seek I guess and just read from the stream 
there, convert to whatever the InputFormat dictates.

The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote nodes which do have replicas of that data.

After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).

Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.

Nicu

From: Kartik Mathur <kar...@bluedata.com>
Sent: Thursday, October 1, 2015 10:25 PM
To: Nicolae Marasoiu
Cc: user
Subject: Re: Problem understanding spark word count execution

Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and 
"shuffle is just sending out the textFile to distribute the partitions", could 
you please elaborate on this  ? what exactly is in this file ?


On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com<mailto:nicolae.maras...@adswizz.com>> wrote:


Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur <kar...@bluedata.com<mailto:kar...@bluedata.com>>
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik



Re: sc.parallelize with defaultParallelism=1

2015-09-30 Thread Nicolae Marasoiu
That's exactly what I am doing, but my question is does parallelize send the 
data to a worker node. From a performance perspective on small sets, the ideal 
would be to load in local jvm memory of the driver. I mean even designating the 
current machine as a worker node, besides driver, would still mean a localhost 
lo/net communication. I guess Spark is a batch oriented system, and I am still 
checking if there are ways to use it like this too, load data manually but 
process it with the functional & other spark libraries but without the 
distribution or m/r part.



From: Andy Dang <nam...@gmail.com>
Sent: Wednesday, September 30, 2015 8:17 PM
To: Nicolae Marasoiu
Cc: user@spark.apache.org
Subject: Re: sc.parallelize with defaultParallelism=1

Can't you just load the data from HBase first, and then call sc.parallelize on 
your dataset?

-Andy

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 12:52 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com<mailto:nicolae.maras...@adswizz.com>> wrote:

Hi,


When calling sc.parallelize(data,1), is there a preference where to put the 
data? I see 2 possibilities: sending it to a worker node, or keeping it on the 
driver program.


I would prefer to keep the data local to the driver. The use case is when I 
need just to load a bit of data from HBase, and then compute over it e.g. 
aggregate, using Spark.


Thanks,

Nicu



Re: [cache eviction] partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,


An equivalent question would be: can the memory cache be selectively evicted 
from within a component run in the driver? I know it is breaking some 
abstraction/encapsulation, but clearly I need to evict part of the cache so 
that it is reloaded with newer values from DB.


Because what I basically need is invalidating some portions of the data which 
have newer values. The "compute" method should be the same (read with 
TableInputFormat).

Thanks
Nicu
____
From: Nicolae Marasoiu <nicolae.maras...@adswizz.com>
Sent: Wednesday, September 30, 2015 4:07 PM
To: user@spark.apache.org
Subject: Re: partition recomputation in big lineage RDDs


Hi,

In fact, my RDD will get a new version (a new RDD assigned to the same var) 
quite frequently, by merging bulks of 1000 events of events of last 10s.

But recomputation would be more efficient to do not by reading initial RDD 
partition(s) and reapplying deltas, but by reading from HBase the latest data, 
and just compute on top of that if anything.

Basically I guess I need to write my own RDD and implement compute method by 
sliding on hbase.

Thanks,
Nicu
________
From: Nicolae Marasoiu <nicolae.maras...@adswizz.com>
Sent: Wednesday, September 30, 2015 3:05 PM
To: user@spark.apache.org
Subject: partition recomputation in big lineage RDDs


Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


sc.parallelize with defaultParallelism=1

2015-09-30 Thread Nicolae Marasoiu
Hi,


When calling sc.parallelize(data,1), is there a preference where to put the 
data? I see 2 possibilities: sending it to a worker node, or keeping it on the 
driver program.


I would prefer to keep the data local to the driver. The use case is when I 
need just to load a bit of data from HBase, and then compute over it e.g. 
aggregate, using Spark.


Thanks,

Nicu


Re: partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,

In fact, my RDD will get a new version (a new RDD assigned to the same var) 
quite frequently, by merging bulks of 1000 events of events of last 10s.

But recomputation would be more efficient to do not by reading initial RDD 
partition(s) and reapplying deltas, but by reading from HBase the latest data, 
and just compute on top of that if anything.

Basically I guess I need to write my own RDD and implement compute method by 
sliding on hbase.

Thanks,
Nicu

From: Nicolae Marasoiu <nicolae.maras...@adswizz.com>
Sent: Wednesday, September 30, 2015 3:05 PM
To: user@spark.apache.org
Subject: partition recomputation in big lineage RDDs


Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


partition recomputation in big lineage RDDs

2015-09-30 Thread Nicolae Marasoiu
Hi,


If I implement a manner to have an up-to-date version of my RDD by ingesting 
some new events, called RDD_inc (from increment), and I provide a "merge" 
function m(RDD, RDD_inc), which returns the RDD_new, it looks like I can evolve 
the state of my RDD by constructing new RDDs all the time, and doing it in a 
manner that hopes to reuse as much data from the past RDD and make the rest 
garbage collectable. An example merge function would be a join on some ids, and 
creating a merged state for each element. The type of the result of m(RDD, 
RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, which 
is not the direct result of hdfs load, but is the result of a long lineage of 
such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed

- the function are applied


And this seems more simplistic, since the partitions do not fully align in the 
general case between all these RDDs. The other aspect is the potentially 
redundant load of data which is in fact not required anymore (the data ruled 
out in the merge).


A more detailed version of this question is at 
https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/


Thanks,

Nicu


Re: Problem understanding spark word count execution

2015-09-30 Thread Nicolae Marasoiu

Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur 
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik