Re: EDI (Electronic Data Interchange) parser on Spark

2018-03-13 Thread Darin McBeath
 I'm not familiar with EDI, but perhaps one option might be spark-xml-utils 
(https://github.com/elsevierlabs-os/spark-xml-utils).  You could transform the 
XML to the XML format required by  the xml-to-json function and then return the 
json.  Spark-xml-utils wraps the open source Saxon project and supports XPath, 
XQuery, and XSLT.    Spark-xml-utils doesn't parallelize the parsing of an 
individual document, but if you have your documents split across a cluster, the 
processing can be parallelized.  We use this package extensively within our 
company to process millions of XML records.  If you happen to be attending 
Spark summit in a few months, someone will be presenting on this topic 
(https://databricks.com/session/mining-the-worlds-science-large-scale-data-matching-and-integration-from-xml-corpora).

Below is a snippet for xquery.
let $retval :=            {$doi}       {$cid}       {$pii}       {$content-type}       {$srctitle}       {$document-type}       {$document-subtype}       {$publication-date}       {$article-title}       {$issn}       {$isbn}           
 {$lang}        {$tables}       return 
xml-to-json($retval)

Darin.
On Tuesday, March 13, 2018, 8:52:42 AM EDT, Aakash Basu 
 wrote:  
 
 Hi Jörn,

Thanks for a quick revert. I already built a EDI to JSON parser from scratch 
using the 811 and 820 standard mapping document. It can run on any standard and 
for any type of EDI. But my built is in native python and doesn't leverage 
Spark's parallel processing, which I want to do for large and huge amount of 
EDI data.

Any pointers on that?

Thanks,
Aakash.

On Tue, Mar 13, 2018 at 3:44 PM, Jörn Franke  wrote:

Maybe there are commercial ones. You could also some of the open source parser 
for xml.

However xml is very inefficient and you need to du a lot of tricks to make it 
run in parallel. This also depends on type of edit message etc. sophisticated 
unit testing and performance testing is key.

Nevertheless it is also not as difficult as I made it sound now.

> On 13. Mar 2018, at 10:36, Aakash Basu  wrote:
>
> Hi,
>
> Did anyone built parallel and large scale X12 EDI parser to XML or JSON using 
> Spark?
>
> Thanks,
> Aakash.


  

How to find the partitioner for a Dataset

2016-09-07 Thread Darin McBeath
I have a Dataset (om) which I created and repartitioned (and cached) using
one of the fields (docId).  Reading the Spark documentation, I would assume
the om Dataset should be hash partitioned.  But, how can I verify this?

When I do om.rdd.partitioner I get 

Option[org.apache.spark.Partitioner] = None

I thought I would have seen HashPartitioner.  But, perhaps this is not
equivalent.

The reason I ask is that when I use this cached Dataset in a join with
another Dataset (partitioned on the same column and cached) I see things
like the following in my explain which makes me think the Dataset might have
lost the partitioner.  I also see a couple of stages for the job where it
seems like each Dataset in my join is being read in and shuffled out again
(I'm assuming for the hash partitioning required by the join)

Exchange hashpartitioning(_1#6062.docId, 8)

Any thoughts/ideas would be appreciated.

Thanks.

Darin.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-the-partitioner-for-a-Dataset-tp27672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Datasets and Partitioners

2016-09-06 Thread Darin McBeath
How do you find the partitioner for a Dataset?

I have a Dataset (om) which I created and repartitioned using one of the fields 
(docId).  Reading the documentation, I would assume the om Dataset should be 
hash partitioned.  But, how can I verify this?

When I do om.rdd.partitioner I get 

Option[org.apache.spark.Partitioner] = None

But, perhaps this is not equivalent.

The reason I ask is that when I use this cached Dataset in a join with another 
Dataset (partitioned on the same column and cached) I see things like the 
following in my explain which makes me think the Dataset might have lost the 
partitioner.  I also see a couple of stages for the job where it seems like 
each Dataset in my join is being read in and shuffled out again (I'm assuming 
for the hash partitioning required by the join)

Exchange hashpartitioning(_1#6062.docId, 8)

Any thoughts/ideas would be appreciated.

Thanks.

Darin.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Dataset Filter performance - trying to understand

2016-09-01 Thread Darin McBeath
I've been trying to understand the performance of Datasets (and filters) in 
Spark 2.0. 

I have a Dataset which I've read from a parquet file and cached into memory 
(deser).  This is spread across 8 partitions and consumes a total of 826MB of 
memory on my cluster.  I verified that the dataset was 100% cached in memory by 
looking at the Spark UI.

I'm using an AWS c3.2xlarge for my 1 worker (8 cores).

There are 108,587,678 total records in my cached dataset (om).

I run the following command (against this cached Dataset) and it takes 13.56s.

om.filter(textAnnotation => textAnnotation.annotType == "ce:para").count

This returns a count of 1,039,993

When I look at the explain() for this query, I see the following:

== Physical Plan ==

*Filter .apply+- InMemoryTableScan [docId#394, annotSet#395, 
annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, 
orig#401, lemma#402, pos#403, xmlId#404], [.apply]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

...
I was a bit perplexed why this takes so long as I had read that Spark could 
filter 1B rows a second on a single cpu.  Granted, my row is likely more 
complex but I thought it should be faster than 13+ seconds to read in 100M rows 
that had been cached into memory.

So, I modified the above query to the following:

om.filter("annotType == 'ce:para'").count

The query now completes in just over 1s (a huge improvement).

When I do the explain plan for this query, I see the following:

== Physical Plan ==
*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))
+- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], [isnotnull(annotType#396), (annotType#396 = ce:para)]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, 
endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, 
xmlId#404], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

This is very similar to the first with the notable exception of 

*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))  
instead of

*Filter .apply

I'm guessing the improved performance is because the object TextAnnotation must 
be created in the first example (and not the second).  Although, this is not 
clear from the explain plans.  Is that correct?  Or is there some other reason 
why the second approach is significantly faster?  I would really like to get a 
solid understanding for why the performance of the second query is so much 
faster.

I also want to clarify whether the InMemoryTableScan and inMemoryRelation are 
part of the whole-stage code generation.  I'm thinking they aren't as they 
aren't prefixed by a "*".  If not, is there something I could do to make take 
this part of whole-stage code generation?

My goal is to make the above operation as fast as possible.  I could of course 
increase the partitions (and the size of my cluster) but I also want to clarify 
my understanding of whole-stage code generation. 

Any thought/suggestions would be appreciated.  Also, if anyone has found good 
resources that further explain the details of the DAG and whole-stage code 
generation, I would appreciate those as well.

Thanks.

Darin.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Best way to read XML data from RDD

2016-08-22 Thread Darin McBeath
Yes, you can use it for single line XML or even a multi-line XML.
In our typical mode of operation, we have sequence files (where the value is 
the XML).  We then run operations over the XML to extract certain values or to 
transform the XML into another format (such as json).
If i understand your question, your content is in json.  Some of the values 
within this json are XML strings.  You should be able to use spark-xml-utils to 
parse this string and filter/evaluate the result of an xpath expression (or 
xquery/xslt).
One limitation of spark-xml-utils when using the evaluate operation is that it 
returns a string.  So, you have to be a little creative when returning multiple 
values (such as delimiting the values with a special character and then 
splitting on this delimiter).  
Darin.

  From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
 To: Darin McBeath <ddmcbe...@yahoo.com>; Hyukjin Kwon <gurwls...@gmail.com>; 
Jörn Franke <jornfra...@gmail.com> 
Cc: Felix Cheung <felixcheun...@hotmail.com>; user <user@spark.apache.org>
 Sent: Monday, August 22, 2016 6:53 AM
 Subject: Re: Best way to read XML data from RDD
   
Hi Darin, 
Ate  you  using  this  utility  to  parse single line XML?

Sent from Samsung Mobile.

 Original message From: Darin McBeath <ddmcbe...@yahoo.com> 
Date:21/08/2016 17:44 (GMT+05:30) To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn 
Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com>, Felix Cheung <felixcheun...@hotmail.com>, user 
<user@spark.apache.org> Subject: Re: Best way to read XML data from RDD 
Another option would be to look at spark-xml-utils.  We use this extensively in 
the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon <gurwls...@gmail.com>
To: Jörn Franke <jornfra...@gmail.com> 
Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung 
<felixcheun...@hotmail.com>; user <user@spark.apache.org>
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more hardware ...
>
>On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
>wrote:
>
>
>Yes . It accepts a xml file as source but not RDD. The XML data embedded  
>inside json is streamed from kafka cluster.  So I could get it as RDD. 
>>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
>>function  but  performance  wise I am not happy as it takes 4 minutes to 
>>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 
>>
>>
>>
>>
>>Sent from Samsung Mobile.
>>
>>
>> Original message 
>>From: Felix Cheung <felixcheun...@hotmail.com> 
>>Date:20/08/2016  09:49  (GMT+05:30) 
>>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> , user 
>><user@spark.apache.org> 
>>Cc: 
>>Subject: Re: Best way to read XML data from RDD 
>>
>>
>>Have you tried
>>
>>https://github.com/databricks/ spark-xml
>>?
>>
>>
>>
>>
>>
>>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
>><diwakar.dhanusk...@gmail.com> wrote:
>>
>>
>>Hi,  
>>
>>
>>There is a RDD with json data. I could read json data using rdd.read.json . 
>>The json data has XML data in couple of key-value paris. 
>>
>>
>>Which is the best method to read and parse XML from rdd. Is there any 
>>specific xml libraries for spark. Could anyone help on this.
>>
>>
>>Thanks.


  

Re: Best way to read XML data from RDD

2016-08-21 Thread Darin McBeath
Another option would be to look at spark-xml-utils.  We use this extensively in 
the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon 
To: Jörn Franke  
Cc: Diwakar Dhanuskodi ; Felix Cheung 
; user 
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke :

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more hardware ...
>
>On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi  
>wrote:
>
>
>Yes . It accepts a xml file as source but not RDD. The XML data embedded  
>inside json is streamed from kafka cluster.  So I could get it as RDD. 
>>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
>>function  but  performance  wise I am not happy as it takes 4 minutes to 
>>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 
>>
>>
>>
>>
>>Sent from Samsung Mobile.
>>
>>
>> Original message 
>>From: Felix Cheung  
>>Date:20/08/2016  09:49  (GMT+05:30) 
>>To: Diwakar Dhanuskodi  , user 
>> 
>>Cc: 
>>Subject: Re: Best way to read XML data from RDD 
>>
>>
>>Have you tried
>>
>>https://github.com/databricks/ spark-xml
>>?
>>
>>
>>
>>
>>
>>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
>> wrote:
>>
>>
>>Hi,  
>>
>>
>>There is a RDD with json data. I could read json data using rdd.read.json . 
>>The json data has XML data in couple of key-value paris. 
>>
>>
>>Which is the best method to read and parse XML from rdd. Is there any 
>>specific xml libraries for spark. Could anyone help on this.
>>
>>
>>Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RDD vs Dataset performance

2016-07-28 Thread Darin McBeath
I started playing round with Datasets on Spark 2.0 this morning and I'm 
surprised by the significant performance difference I'm seeing between an RDD 
and a Dataset for a very basic example.


I've defined a simple case class called AnnotationText that has a handful of 
fields.


I create a Dataset[AnnotationText] with my data and repartition(4) this on one 
of the columns and cache the resulting dataset as ds (force the cache by 
executing a count action).  Everything looks good and I have more than 10M 
records in my dataset ds.

When I execute the following:

ds.filter(textAnnotation => textAnnotation.text == 
"mutational".toLowerCase).count 

It consistently finishes in just under 3 seconds.  One of the things I notice 
is that it has 3 stages.  The first stage is skipped (as this had to do with 
creation ds and it was already cached).  The second stage appears to do the 
filtering (requires 4 tasks) but interestingly it shuffles output.  The third 
stage (requires only 1 task) appears to count the results of the shuffle.  

When I look at the cached dataset (on 4 partitions) it is 82.6MB.

I then decided to convert the ds dataset to an RDD as follows, repartition(4) 
and cache.

val aRDD = ds.rdd.repartition(4).cache
aRDD.count
So, I now have an RDD[AnnotationText]

When I execute the following:

aRDD.filter(textAnnotation => textAnnotation.text == 
"mutational".toLowerCase).count

It consistently finishes in just under half a second.  One of the things I 
notice is that it only has 2 stages.  The first stage is skipped (as this had 
to do with creation of aRDD and it was already cached).  The second stage 
appears to do the filtering and count(requires 4 tasks).  Interestingly, there 
is no shuffle (or subsequently 3rd stage).   

When I look at the cached RDD (on 4 partitions) it is 2.9GB.


I was surprised how significant the cached storage difference was between the 
Dataset (82.6MB) and the RDD (2.9GB) version of the same content.  Is this kind 
of difference to be expected?

While I like the smaller size for the Dataset version, I was confused as to why 
the performance for the Dataset version was so much slower (2.5s vs .5s).  I 
suspect it might be attributed to the shuffle and third stage required by the 
Dataset version but I'm not sure. I was under the impression that Datasets 
should (would) be faster in many use cases (such as the one I'm using above).  
Am I doing something wrong or is this to be expected?

Thanks.

Darin.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
ok, some more information (and presumably a workaround).

when I initial read in my file, I use the following code.

JavaRDD keyFileRDD = sc.textFile(keyFile)

Looking at the UI, this file has 2 partitions (both on the same executor).

I then subsequently repartition this RDD (to 16)

partKeyFileRDD = keyFileRDD.repartition(16)

Looking again at the UI, this file has 16 partitions now (all on the same 
executor). When the forEachPartition runs, this then uses these 16 partitions 
(all on the same executor).  I think this is really the problem.  I'm not sure 
why the repartition didn't spread the partitions across both executors.

When the mapToPair subsequently runs below both executors are used and things 
start falling apart because none of the initialization logic was performed on 
the one executor.

However, if I modify the code above 

JavaRDD keyFileRDD = sc.textFile(keyFile,16)

Then initial keyFileRDD will be in 16 partitions spread across both executors.  
When I execute my forEachPartition directly on keyFileRDD (since there is no 
need to repartition), both executors will now be used (and initialized).

Anyway, don't know if this is my lack of understanding for how repartition 
should work or if this is a bug.  Thanks Jacek for starting to dig into this.

Darin.



- Original Message -
From: Darin McBeath <ddmcbe...@yahoo.com.INVALID>
To: Jacek Laskowski <ja...@japila.pl>
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:57 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor

My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value 
is the rec.
JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new 
SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following 
log.info statements only appear in the log file for one of my executors (and 
not both).  In other words, when executing the forEachPartition above, Spark 
appears to think all of the partitions are on one executor (at least that is 
the impression I'm left with).  But, when I get to the mapToToPair, Spark 
suddenly begins to use both executors.  I have verified that there are 16 
partitions for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction<Iterator> 
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}


From: Jacek Laskowski <ja...@japila.pl>
To: Darin McBeath <ddmcbe...@yahoo.com> 
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath" <ddmcbe...@yahoo.com> napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the 
>foreachPartition, I can see the statements being logged. Oddly, these output 
>statements only occur in one executor (and not the other).  It occurs 16 times 
>in this executor  since there are 16 partitions.  This seems odd as there are 
>only 8 cores on the executor and the other executor doesn't appear to be 
>called at all in the foreachPartition call.  But, when I go to do a map 
>function on this same RDD then things start blowing up on the other executor 
>as it starts doing work for some partitions (although, it would appear that 
>all partitions were only initialized on the other executor). The executor that 
>was used in the foreachPartition call works fine and doesn't experience issue. 
> But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>
>From: Jacek Laskowski <ja...@japila.pl>
>To: Darin McBeath <ddmcbe...@yahoo.com>
>Cc: user <user@spark.apache.org>
>Sent: Friday, Marc

Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value 
is the rec.
JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new 
SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following 
log.info statements only appear in the log file for one of my executors (and 
not both).  In other words, when executing the forEachPartition above, Spark 
appears to think all of the partitions are on one executor (at least that is 
the impression I'm left with).  But, when I get to the mapToToPair, Spark 
suddenly begins to use both executors.  I have verified that there are 16 
partitions for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction<Iterator> 
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}


From: Jacek Laskowski <ja...@japila.pl>
To: Darin McBeath <ddmcbe...@yahoo.com> 
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath" <ddmcbe...@yahoo.com> napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the 
>foreachPartition, I can see the statements being logged. Oddly, these output 
>statements only occur in one executor (and not the other).  It occurs 16 times 
>in this executor  since there are 16 partitions.  This seems odd as there are 
>only 8 cores on the executor and the other executor doesn't appear to be 
>called at all in the foreachPartition call.  But, when I go to do a map 
>function on this same RDD then things start blowing up on the other executor 
>as it starts doing work for some partitions (although, it would appear that 
>all partitions were only initialized on the other executor). The executor that 
>was used in the foreachPartition call works fine and doesn't experience issue. 
> But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>
>From: Jacek Laskowski <ja...@japila.pl>
>To: Darin McBeath <ddmcbe...@yahoo.com>
>Cc: user <user@spark.apache.org>
>Sent: Friday, March 11, 2016 1:24 PM
>Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
>executor
>
>
>
>Hi,
>How do you check which executor is used? Can you include a screenshot of the 
>master's webUI with workers?
>Jacek
>11.03.2016 6:57 PM "Darin McBeath" <ddmcbe...@yahoo.com.invalid> napisał(a):
>
>I've run into a situation where it would appear that foreachPartition is only 
>running on one of my executors.
>>
>>I have a small cluster (2 executors with 8 cores each).
>>
>>When I run a job with a small file (with 16 partitions) I can see that the 16 
>>partitions are initialized but they all appear to be initialized on only one 
>>executor.  All of the work then runs on this  one executor (even though the 
>>number of partitions is 16). This seems odd, but at least it works.  Not sure 
>>why the other executor was not used.
>>
>>However, when I run a larger file (once again with 16 partitions) I can see 
>>that the 16 partitions are initialized once again (but all on the same 
>>executor).  But, this time subsequent work is now spread across the 2 
>>executors.  This of course results in problems because the other executor was 
>>not initialized as all of the partitions were only initialized on the other 
>>executor.
>>
>>Does anyone have any suggestions for where I might want to investigate?  Has 
>>anyone else seen something like this before?  Any thoughts/insights wou

Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath


I can verify this by looking at the log file for the workers.

Since I output logging statements in the object called by the foreachPartition, 
I can see the statements being logged. Oddly, these output statements only 
occur in one executor (and not the other).  It occurs 16 times in this executor 
 since there are 16 partitions.  This seems odd as there are only 8 cores on 
the executor and the other executor doesn't appear to be called at all in the 
foreachPartition call.  But, when I go to do a map function on this same RDD 
then things start blowing up on the other executor as it starts doing work for 
some partitions (although, it would appear that all partitions were only 
initialized on the other executor). The executor that was used in the 
foreachPartition call works fine and doesn't experience issue.  But, because 
the other executor is failing on every request the job dies.

Darin.



From: Jacek Laskowski <ja...@japila.pl>
To: Darin McBeath <ddmcbe...@yahoo.com> 
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:24 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
How do you check which executor is used? Can you include a screenshot of the 
master's webUI with workers? 
Jacek 
11.03.2016 6:57 PM "Darin McBeath" <ddmcbe...@yahoo.com.invalid> napisał(a):

I've run into a situation where it would appear that foreachPartition is only 
running on one of my executors.
>
>I have a small cluster (2 executors with 8 cores each).
>
>When I run a job with a small file (with 16 partitions) I can see that the 16 
>partitions are initialized but they all appear to be initialized on only one 
>executor.  All of the work then runs on this  one executor (even though the 
>number of partitions is 16). This seems odd, but at least it works.  Not sure 
>why the other executor was not used.
>
>However, when I run a larger file (once again with 16 partitions) I can see 
>that the 16 partitions are initialized once again (but all on the same 
>executor).  But, this time subsequent work is now spread across the 2 
>executors.  This of course results in problems because the other executor was 
>not initialized as all of the partitions were only initialized on the other 
>executor.
>
>Does anyone have any suggestions for where I might want to investigate?  Has 
>anyone else seen something like this before?  Any thoughts/insights would be 
>appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
>the spark ec2 scripts  and submitting my job using spark-submit.
>
>Thanks.
>
>Darin.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
I've run into a situation where it would appear that foreachPartition is only 
running on one of my executors.

I have a small cluster (2 executors with 8 cores each).

When I run a job with a small file (with 16 partitions) I can see that the 16 
partitions are initialized but they all appear to be initialized on only one 
executor.  All of the work then runs on this  one executor (even though the 
number of partitions is 16). This seems odd, but at least it works.  Not sure 
why the other executor was not used.

However, when I run a larger file (once again with 16 partitions) I can see 
that the 16 partitions are initialized once again (but all on the same 
executor).  But, this time subsequent work is now spread across the 2 
executors.  This of course results in problems because the other executor was 
not initialized as all of the partitions were only initialized on the other 
executor.

Does anyone have any suggestions for where I might want to investigate?  Has 
anyone else seen something like this before?  Any thoughts/insights would be 
appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
the spark ec2 scripts  and submitting my job using spark-submit.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best practice for retrieving over 1 million files from S3

2016-01-13 Thread Darin McBeath
I'm looking for some suggestions based on other's experiences.

I currently have a job that I need to run periodically where I need to read on 
the order of 1+ million files from an S3 bucket.  It is not the entire bucket 
(nor does it match a pattern).  Instead, I have a list of random keys that are 
'names' for the files in this S3 bucket.  The bucket itself will contain 
upwards of 60M or more files.

My current approach has been to get my list of keys, partition on the key, and 
then map this to an underlying class that uses the most recent AWS SDK to 
retrieve the file from S3 using this key, which then returns the file.  So, in 
the end, I have an RDD.  This works, but I really wonder if this is the 
best way.  I suspect there might be a better/faster way.

One thing I've been considering is passing all of the keys (using s3n: urls) to 
sc.textFile or sc.wholeTextFiles(since some of my files can have embedded 
newlines).  But, I wonder how either of these would behave if I passed 
literally a million (or more) 'filenames'.

Before I spend time exploring, I wanted to seek some input.

Any thoughts would be appreciated.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.6 and Application History not working correctly

2016-01-13 Thread Darin McBeath
I tried using Spark 1.6 in a stand-alone cluster this morning.

I submitted 2 jobs (and they both executed fine).  In fact, they are the exact 
same jobs with just some different parameters.

I was able to view the application history for the first job.

However, when I tried to view the second job, I get the following error message.

Application history not found (app-20160113140054-0001)
No event logs found for application SparkSync Application in 
file:///root/spark/applicationHistory. Did you specify the correct logging 
directory?


Everything works fine with Spark 1.5.  I'm able to view the application history 
for both jobs.

Has anyone else noticed this issue?  Any suggestions? 

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6 and Application History not working correctly

2016-01-13 Thread Darin McBeath
Thanks.

I already set the following in spark-defaults.conf so I don't think that is 
going to fix my problem.

spark.eventLog.dir file:///root/spark/applicationHistory
spark.eventLog.enabled true


I suspect my problem must be something else.
Darin.


From: Don Drake <dondr...@gmail.com>
To: Darin McBeath <ddmcbe...@yahoo.com> 
Cc: User <user@spark.apache.org>
Sent: Wednesday, January 13, 2016 10:10 AM
Subject: Re: Spark 1.6 and Application History not working correctly



I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.

I resolved it be setting the following command-line parameters:

spark.eventLog.enabled=true

spark.eventLog.dir=


-Don


On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath <ddmcbe...@yahoo.com.invalid> 
wrote:

I tried using Spark 1.6 in a stand-alone cluster this morning.
>
>I submitted 2 jobs (and they both executed fine).  In fact, they are the exact 
>same jobs with just some different parameters.
>
>I was able to view the application history for the first job.
>
>However, when I tried to view the second job, I get the following error 
>message.
>
>Application history not found (app-20160113140054-0001)
>No event logs found for application SparkSync Application in 
>file:///root/spark/applicationHistory. Did you specify the correct logging 
>directory?
>
>
>Everything works fine with Spark 1.5.  I'm able to view the application 
>history for both jobs.
>
>Has anyone else noticed this issue?  Any suggestions?
>
>Thanks.
>
>Darin.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Turning off DTD Validation using XML Utils package - Spark

2015-12-04 Thread Darin McBeath
ok, a new capability has been added to spark-xml-utils (1.3.0) to address this 
request.

Essentially, the capability to specify 'processor' features has been added 
(through a new getInstance function).  Here is a list of  the features that can 
be set 
(http://www.saxonica.com/html/documentation/javadoc/net/sf/saxon/lib/FeatureKeys.html).
  Since we are leveraging the s9apiProcessor under the covers, features 
relevant to that are the only ones that would make sense to use.

To address your request of completely ignoring the Doctype declaration in the 
xml, you would need to do the following:

import net.sf.saxon.lib.FeatureKeys;
HashMap<String,Object> featureMap = new HashMap<String,Object>();
featureMap.put(FeatureKeys.ENTITY_RESOLVER_CLASS, 
"com.somepackage.IgnoreDoctype");
// The first parameter is the xpath expression
// The second parameter is the hashmap for the namespace mappings (in this case 
there are none)
// The third parameter is the hashmap for the processor features
XPathProcessor proc = XPathProcessor.getInstance("/books/book",null,featureMap);


The following evaluation should now work ok.

proc.evaluateString("Some 
BookSome 
Author200529.99"));
} catch (XPathException e) 


You then would define the following class (and make sure it is included in your 
application)

package com.somepackage;
import java.io.ByteArrayInputStream;
import org.xml.sax.EntityResolver;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

public class IgnoreDoctype implements EntityResolver {

public InputSource resolveEntity(java.lang.String publicId, java.lang.String 
systemId)
 throws SAXException, java.io.IOException
  {
  // Ignore everything
  return new InputSource(new ByteArrayInputStream("".getBytes()));

  }
 

}


Lastly, you will need to include the saxon9he jar file (open source version).

This would work for XPath, XQuery, and XSLT.

Hope this helps.  When I get a chance, I will update the spark-xml-utils github 
site with details on the new getInstance function and some more information on 
the various features.
Darin.



From: Darin McBeath <ddmcbe...@yahoo.com.INVALID>
To: "user@spark.apache.org" <user@spark.apache.org> 
Sent: Tuesday, December 1, 2015 11:51 AM
Subject: Re: Turning off DTD Validation using XML Utils package - Spark




The problem isn't really with DTD validation (by default validation is 
disabled).  The underlying problem is that the DTD can't be found (which is 
indicated in your stack trace below).  The underlying parser will try and 
retrieve the DTD (regardless of  validation) because things such as entities 
could be expressed in the DTD.

I will explore providing access to some of the underlying 'processor' 
configurations.  For example, you could provide your own EntityResolver class 
that could either completely ignore the Doctype declaration (return a 'dummy' 
DTD that is completely empty) or you could have it find 'local' versions (on 
the workers or in S3 and then cache them locally for performance).  

I will post an update when the code has been adjusted.

Darin.

- Original Message -
From: Shivalik <shivalik.malho...@outlook.com>
To: user@spark.apache.org
Sent: Tuesday, December 1, 2015 8:15 AM
Subject: Turning off DTD Validation using XML Utils package - Spark

Hi Team,

I've been using XML Utils library 
(http://spark-packages.org/package/elsevierlabs-os/spark-xml-utils) to parse
XML using XPath in a spark job. One problem I am facing is with the DTDs. My
XML file, has a doctype tag included in it.

I want to turn off DTD validation using this library since I don't have
access to DTD file. Has someone faced this problem before. Please help.

The exception I am getting it is as below:

stage 0.0 (TID 0, localhost):
com.elsevier.spark_xml_utils.xpath.XPathException: I/O error reported by XML
parser processing null: /filename.dtd (No such file or directory)

at
com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluate(XPathProcessor.java:301)

at
com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluateString(XPathProcessor.java:219)

at com.thomsonreuters.xmlutils.XMLParser.lambda$0(XMLParser.java:31)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Turning-off-DTD-Validation-using-XML-Utils-package-Spark-tp25534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e

Re: Turning off DTD Validation using XML Utils package - Spark

2015-12-01 Thread Darin McBeath


The problem isn't really with DTD validation (by default validation is 
disabled).  The underlying problem is that the DTD can't be found (which is 
indicated in your stack trace below).  The underlying parser will try and 
retrieve the DTD (regardless of  validation) because things such as entities 
could be expressed in the DTD.

I will explore providing access to some of the underlying 'processor' 
configurations.  For example, you could provide your own EntityResolver class 
that could either completely ignore the Doctype declaration (return a 'dummy' 
DTD that is completely empty) or you could have it find 'local' versions (on 
the workers or in S3 and then cache them locally for performance).  

I will post an update when the code has been adjusted.

Darin.

- Original Message -
From: Shivalik 
To: user@spark.apache.org
Sent: Tuesday, December 1, 2015 8:15 AM
Subject: Turning off DTD Validation using XML Utils package - Spark

Hi Team,

I've been using XML Utils library 
(http://spark-packages.org/package/elsevierlabs-os/spark-xml-utils) to parse
XML using XPath in a spark job. One problem I am facing is with the DTDs. My
XML file, has a doctype tag included in it.

I want to turn off DTD validation using this library since I don't have
access to DTD file. Has someone faced this problem before. Please help.

The exception I am getting it is as below:

stage 0.0 (TID 0, localhost):
com.elsevier.spark_xml_utils.xpath.XPathException: I/O error reported by XML
parser processing null: /filename.dtd (No such file or directory)

at
com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluate(XPathProcessor.java:301)

at
com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluateString(XPathProcessor.java:219)

at com.thomsonreuters.xmlutils.XMLParser.lambda$0(XMLParser.java:31)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Turning-off-DTD-Validation-using-XML-Utils-package-Spark-tp25534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading xml in java using spark

2015-09-01 Thread Darin McBeath
Another option might be to leverage spark-xml-utils 
(https://github.com/dmcbeath/spark-xml-utils)

This is a collection of xml utilities that I've recently revamped that make it 
relatively easy to use xpath, xslt, or xquery within the context of a Spark 
application (or at least I think so).  My previous attempt was not overly 
friendly, but as I've learned more about Spark (and needed easier to use xml 
utilities) I've hopefully made this easier to use and understand.  I hope 
others find it useful.

Back to your problem.  Assuming you have a bunch of xml records in an RDD, you 
should be able to do something like the following to count the number of 
elements for that type.  In the example below, I'm counting the number of 
references in documents.  The xmlKeyPair is an RDD of type (String,String) 
where the first item is the 'key' and the second item is the xml record.  The 
xpath expression identifies the 'reference' element I want to count.

import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import scala.collection.JavaConverters._
import java.util.HashMap

xmlKeyPair.mapPartitions(recsIter => {
 val xpath = 
"count(/xocs:doc/xocs:meta/xocs:references/xocs:ref-info)"
 val namespaces = new HashMap[String,String](Map(
"xocs" -> 
"http://www.elsevier.com/xml/xocs/dtd;
  ).asJava)
 val proc = XPathProcessor.getInstance(xpath,namespaces)
 recsIter.map(rec => proc.evaluateString(rec._2).toInt)
   }).sum


There is more documentation on the spark-xml-utils github site.  Let me know if 
the documentation is not clear or if you have any questions. 

Darin.



From: Rick Hillegas 
To: Sonal Goyal  
Cc: rakesh sharma ; user@spark.apache.org 
Sent: Monday, August 31, 2015 10:51 AM
Subject: Re: Reading xml in java using spark



Hi Rakesh,

You might also take a look at the Derby code.
   org.apache.derby.vti.XmlVTI provides a number of static methods for
   turning an XML resource into a JDBC ResultSet.

Thanks,
-Rick

On 8/31/15 4:44 AM, Sonal Goyal wrote: 


I think the mahout project had an xmlinoutformat which you can leverage.
>On Aug 31, 2015 5:10 PM, "rakesh sharma"  wrote:
>
>I want to parse an xml file in spark 
>>But as far as example is concerned it reads it as text file. The maping to 
>>xml will be a tedious job.
>>How can I find the number of elements of a particular type using that. Any 
>>help in java/scala code is also welcome
>>
>>
>>thanks
>>rakesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Please add the Cincinnati spark meetup to the list of meet ups

2015-07-07 Thread Darin McBeath
 http://www.meetup.com/Cincinnati-Apache-Spark-Meetup/

Thanks.
Darin.

Running into several problems with Data Frames

2015-04-17 Thread Darin McBeath
I decided to play around with DataFrames this morning but I'm running into 
quite a few issues.  I'm assuming that I must be doing something wrong so would 
appreciate some advice.

First, I create my Data Frame.

import sqlContext.implicits._
case class Entity(InternalId: Long, EntityId: Long, EntityType: String, 
CustomerId: String, EntityURI: String, NumDocs: Long)
val entities = sc.textFile(s3n://darin/Entities.csv)
val entitiesArr = entities.map(v = v.split('|'))
val dfEntity = entitiesArr.map(arr = Entity(arr(0).toLong, arr(1).toLong, 
arr(2), arr(3), arr(4), arr(5).toLong)).toDF()

Second, I verify the schema.

dfEntity.printSchema

root
|-- InternalId: long (nullable = false)
|-- EntityId: long (nullable = false)
|-- EntityType: string (nullable = true)
|-- CustomerId: string (nullable = true)
|-- EntityURI: string (nullable = true)
|-- NumDocs: long (nullable = false)



Third, I verify I can select  a column.

dfEntity.select(InternalId).limit(10).show()

InternalId
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 


But, things then start to break down.  Let's assume I want to filter so I only 
have records where the InternalId is  5.

dfEntity.filter(InternalId  5L).count()

But, this gives me the following error message.  Doesn't the schema above 
indicate the InternalId column should be of type Long?

console:42: error: type mismatch;
found   : Long(5L)
required: String
dfEntity.filter(InternalId  5L).count()


I then try the following

dfEntity.filter(dfEntity(InternalId)  5L).count()

Now, this gives me the following error instead.


org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in 
stage 153.0 failed 4 times, most recent failure: Lost task 13.3 in stage 153.0 
(TID 1636, ip-10-0-200-6.ec2.internal): java.lang.ArrayIndexOutOfBoundsException

I'm using Apache Spark 1.3.  

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



repartitionAndSortWithinPartitions and mapPartitions and sort order

2015-03-12 Thread Darin McBeath
I am using repartitionAndSortWithinPartitions to partition my content and then 
sort within each partition.  I've also created a custom partitioner that I use 
with repartitionAndSortWithinPartitions. I created a custom partitioner as my 
key consist of something like 'groupid|timestamp' and I only want to partition 
on the group id but I want to sort the records on each partition using the 
entire key (groupid and the timestamp).

My question is when I use mapPartitions (to process the records in each 
partition) is whether the order in each partition will be guaranteed (from the 
sort) as I iterate through the records in each partition.  As I iterate, while 
processing the current record I need to look at the previous record and the 
next record in the partition and I need to make sure the records would be 
processed in the sorted order.

I tend to think so, but wanted to confirm.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question about the spark assembly deployed to the cluster with the ec2 scripts

2015-03-05 Thread Darin McBeath
I've downloaded spark  1.2.0 to my laptop.  In the lib directory, it includes 
spark-assembly-1.2.0-hadoop2.4.0.jar

When I spin up a cluster using the ec2 scripts with 1.2.0 (and set 
--hadoop-major-version=2) I notice that in the lib directory for the 
master/slaves the assembly is for hadoop2.0.0 (and I think Cloudera).

Is there a way that I  can force the install of the same assembly to the 
cluster that comes with the 1.2 download of spark?

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question about Spark best practice when counting records.

2015-02-27 Thread Darin McBeath
Thanks for you quick reply.  Yes, that would be fine.   I would rather wait/use 
the optimal approach as opposed to hacking some one-off solution.

Darin.



From: Kostas Sakellis kos...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Friday, February 27, 2015 12:19 PM
Subject: Re: Question about Spark best practice when counting records.



Hey Darin,

Record count metrics are coming in Spark 1.3. Can you wait until it is 
released? Or do you need a solution in older versions of spark.

Kostas

On Friday, February 27, 2015, Darin McBeath ddmcbe...@yahoo.com.invalid wrote:



I have a fairly large Spark job where I'm essentially creating quite a few 
RDDs, do several types of joins using these RDDS resulting in a final RDD which 
I write back to S3.


Along the way, I would like to capture record counts for some of these RDDs. 
My initial approach was to use the count action on some of these intermediate  
RDDS (and cache them since the count would force the materialization of the 
RDD and the RDD would be needed again later).  This seemed to work 'ok' when 
my RDDs were fairly small/modest but as they grew in size I started to 
experience problems.

After watching a recent very good screencast on performance, this doesn't seem 
the correct approach as I believe I'm really breaking (or hindering) the 
pipelining concept in Spark.  If I remove all of my  counts, I'm only left 
with the one job/action (save as Hadoop file at the end).  Spark then seems to 
run smoother (and quite a bit faster) and I really don't need (or want) to 
even cache any of my intermediate RDDs.

So, the approach I've been kicking around is to use accumulators instead.  I 
was already using them to count 'bad' records but why not 'good' records as 
well? I realize that if I lose a partition that I might over count, but  
perhaps that is an acceptable trade-off.

I'm guessing that others have ran into this before so I would like to learn 
from the experience of others and how they have addressed this.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question about Spark best practice when counting records.

2015-02-27 Thread Darin McBeath
I have a fairly large Spark job where I'm essentially creating quite a few 
RDDs, do several types of joins using these RDDS resulting in a final RDD which 
I write back to S3.


Along the way, I would like to capture record counts for some of these RDDs. My 
initial approach was to use the count action on some of these intermediate  
RDDS (and cache them since the count would force the materialization of the RDD 
and the RDD would be needed again later).  This seemed to work 'ok' when my 
RDDs were fairly small/modest but as they grew in size I started to experience 
problems.

After watching a recent very good screencast on performance, this doesn't seem 
the correct approach as I believe I'm really breaking (or hindering) the 
pipelining concept in Spark.  If I remove all of my  counts, I'm only left with 
the one job/action (save as Hadoop file at the end).  Spark then seems to run 
smoother (and quite a bit faster) and I really don't need (or want) to even 
cache any of my intermediate RDDs.

So, the approach I've been kicking around is to use accumulators instead.  I 
was already using them to count 'bad' records but why not 'good' records as 
well? I realize that if I lose a partition that I might over count, but  
perhaps that is an acceptable trade-off.

I'm guessing that others have ran into this before so I would like to learn 
from the experience of others and how they have addressed this.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



job keeps failing with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1

2015-02-25 Thread Darin McBeath
I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8 
r3.8xlarge machines but limit the job to only 128 cores.  I have also tried 
other things such as setting 4 workers per r3.8xlarge and 67gb each but this 
made no difference.

The job frequently fails at the end in this step (saveasHadoopFile).   It will 
sometimes work.

finalNewBaselinePairRDD is hashPartitioned with 1024 partitions and a total 
size around 1TB.  There are about 13.5M records in finalNewBaselinePairRDD.  
finalNewBaselinePairRDD is String,String


JavaPairRDDText, Text finalBaselineRDDWritable = 
finalNewBaselinePairRDD.mapToPair(new 
ConvertToWritableTypes()).persist(StorageLevel.MEMORY_AND_DISK_SER());

// Save to hdfs (gzip)
finalBaselineRDDWritable.saveAsHadoopFile(hdfs:///sparksync/, Text.class, 
Text.class, 
SequenceFileOutputFormat.class,org.apache.hadoop.io.compress.GzipCodec.class); 


If anyone has any tips for what I should look into it would be appreciated.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Just to close the loop in case anyone runs into the same problem I had.

By setting --hadoop-major-version=2 when using the ec2 scripts, everything 
worked fine.

Darin.


- Original Message -
From: Darin McBeath ddmcbe...@yahoo.com.INVALID
To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
Cc: user@spark.apache.org user@spark.apache.org
Sent: Monday, February 23, 2015 3:16 PM
Subject: Re: Which OutputCommitter to use for S3?

Thanks.  I think my problem might actually be the other way around.

I'm compiling with hadoop 2,  but when I startup Spark, using the ec2 scripts, 
I don't specify a 
-hadoop-major-version and the default is 1.   I'm guessing that if I make that 
a 2 that it might work correctly.  I'll try it and post a response.


- Original Message -
From: Mingyu Kim m...@palantir.com
To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com
Cc: user@spark.apache.org user@spark.apache.org
Sent: Monday, February 23, 2015 3:06 PM
Subject: Re: Which OutputCommitter to use for S3?

Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote:

Aaron.  Thanks for the class. Since I'm currently writing Java based
Spark applications, I tried converting your class to Java (it seemed
pretty straightforward).

I set up the use of the class as follows:

SparkConf conf = new SparkConf()
.set(spark.hadoop.mapred.output.committer.class,
com.elsevier.common.DirectOutputCommitter);

And I then try and save a file to S3 (which I believe should use the old
hadoop apis).

JavaPairRDDText, Text newBaselineRDDWritable =
reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
Text.class, Text.class, SequenceFileOutputFormat.class,
org.apache.hadoop.io.compress.GzipCodec.class);

But, I get the following error message.

Exception in thread main java.lang.IncompatibleClassChangeError: Found
class org.apache.hadoop.mapred.JobContext, but interface was expected
at 
com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
java:68)
at 
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
.scala:1075)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:940)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:902)
at 
org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
71)
at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)

In my class, JobContext is an interface of  type
org.apache.hadoop.mapred.JobContext.

Is there something obvious that I might be doing wrong (or messed up in
the translation from Scala to Java) or something I should look into?  I'm
using Spark 1.2 with hadoop 2.4.


Thanks.

Darin.





From: Aaron Davidson ilike...@gmail.com
To: Andrew Ash and...@andrewash.com
Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
user@spark.apache.org user@spark.apache.org; Aaron Davidson
aa...@databricks.com
Sent: Saturday, February 21, 2015 7:01 PM
Subject: Re: Which OutputCommitter to use for S3?



Here is the class:
https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=

You can use it by setting mapred.output.committer.class in the Hadoop
configuration (or spark.hadoop.mapred.output.committer.class in the
Spark configuration). Note that this only works for the old Hadoop APIs,
I believe the new Hadoop APIs strongly tie committer to input format (so
FileInputFormat always uses FileOutputCommitter), which makes this fix
more difficult to apply.




On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

Josh is that class something you guys would consider open sourcing, or
would you rather the community step up and create an OutputCommitter
implementation optimized for S3?


On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

We (Databricks) use our own DirectOutputCommitter implementation, which
is a couple tens of lines of Scala code.  The class would almost
entirely be a no-op except we took some care to properly handle the
_SUCCESS file.


On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

I didn¹t get any response. It¹d be really appreciated if anyone using a
special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim

Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Aaron.  Thanks for the class. Since I'm currently writing Java based Spark 
applications, I tried converting your class to Java (it seemed pretty 
straightforward). 

I set up the use of the class as follows:

SparkConf conf = new SparkConf()
.set(spark.hadoop.mapred.output.committer.class, 
com.elsevier.common.DirectOutputCommitter);

And I then try and save a file to S3 (which I believe should use the old hadoop 
apis).

JavaPairRDDText, Text newBaselineRDDWritable = 
reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, 
Text.class, SequenceFileOutputFormat.class, 
org.apache.hadoop.io.compress.GzipCodec.class);

But, I get the following error message.

Exception in thread main java.lang.IncompatibleClassChangeError: Found class 
org.apache.hadoop.mapred.JobContext, but interface was expected
at 
com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.java:68)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1075)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:902)
at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:771)
at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)

In my class, JobContext is an interface of  type 
org.apache.hadoop.mapred.JobContext.

Is there something obvious that I might be doing wrong (or messed up in the 
translation from Scala to Java) or something I should look into?  I'm using 
Spark 1.2 with hadoop 2.4.


Thanks.

Darin.





From: Aaron Davidson ilike...@gmail.com
To: Andrew Ash and...@andrewash.com 
Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; 
user@spark.apache.org user@spark.apache.org; Aaron Davidson 
aa...@databricks.com 
Sent: Saturday, February 21, 2015 7:01 PM
Subject: Re: Which OutputCommitter to use for S3?



Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec

You can use it by setting mapred.output.committer.class in the Hadoop 
configuration (or spark.hadoop.mapred.output.committer.class in the Spark 
configuration). Note that this only works for the old Hadoop APIs, I believe 
the new Hadoop APIs strongly tie committer to input format (so FileInputFormat 
always uses FileOutputCommitter), which makes this fix more difficult to apply.




On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

Josh is that class something you guys would consider open sourcing, or would 
you rather the community step up and create an OutputCommitter implementation 
optimized for S3?


On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

We (Databricks) use our own DirectOutputCommitter implementation, which is a 
couple tens of lines of Scala code.  The class would almost entirely be a 
no-op except we took some care to properly handle the _SUCCESS file.


On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

I didn’t get any response. It’d be really appreciated if anyone using a 
special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim m...@palantir.com
Date: Monday, February 16, 2015 at 1:15 AM
To: user@spark.apache.org user@spark.apache.org
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCommitter used by RDD, which is FileOutputCommitter, seems 
to require moving files at the commit step, which is not a constant 
operation in S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in SPARK-3595), but I wanted to 
check if there is a de facto standard, publicly available OutputCommitter to 
use for S3 in conjunction with Spark.


Thanks,
Mingyu



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Thanks.  I think my problem might actually be the other way around.

I'm compiling with hadoop 2,  but when I startup Spark, using the ec2 scripts, 
I don't specify a 
-hadoop-major-version and the default is 1.   I'm guessing that if I make that 
a 2 that it might work correctly.  I'll try it and post a response.


- Original Message -
From: Mingyu Kim m...@palantir.com
To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com
Cc: user@spark.apache.org user@spark.apache.org
Sent: Monday, February 23, 2015 3:06 PM
Subject: Re: Which OutputCommitter to use for S3?

Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote:

Aaron.  Thanks for the class. Since I'm currently writing Java based
Spark applications, I tried converting your class to Java (it seemed
pretty straightforward).

I set up the use of the class as follows:

SparkConf conf = new SparkConf()
.set(spark.hadoop.mapred.output.committer.class,
com.elsevier.common.DirectOutputCommitter);

And I then try and save a file to S3 (which I believe should use the old
hadoop apis).

JavaPairRDDText, Text newBaselineRDDWritable =
reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
Text.class, Text.class, SequenceFileOutputFormat.class,
org.apache.hadoop.io.compress.GzipCodec.class);

But, I get the following error message.

Exception in thread main java.lang.IncompatibleClassChangeError: Found
class org.apache.hadoop.mapred.JobContext, but interface was expected
at 
com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
java:68)
at 
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
.scala:1075)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:940)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:902)
at 
org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
71)
at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)

In my class, JobContext is an interface of  type
org.apache.hadoop.mapred.JobContext.

Is there something obvious that I might be doing wrong (or messed up in
the translation from Scala to Java) or something I should look into?  I'm
using Spark 1.2 with hadoop 2.4.


Thanks.

Darin.





From: Aaron Davidson ilike...@gmail.com
To: Andrew Ash and...@andrewash.com
Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
user@spark.apache.org user@spark.apache.org; Aaron Davidson
aa...@databricks.com
Sent: Saturday, February 21, 2015 7:01 PM
Subject: Re: Which OutputCommitter to use for S3?



Here is the class:
https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=

You can use it by setting mapred.output.committer.class in the Hadoop
configuration (or spark.hadoop.mapred.output.committer.class in the
Spark configuration). Note that this only works for the old Hadoop APIs,
I believe the new Hadoop APIs strongly tie committer to input format (so
FileInputFormat always uses FileOutputCommitter), which makes this fix
more difficult to apply.




On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

Josh is that class something you guys would consider open sourcing, or
would you rather the community step up and create an OutputCommitter
implementation optimized for S3?


On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

We (Databricks) use our own DirectOutputCommitter implementation, which
is a couple tens of lines of Scala code.  The class would almost
entirely be a no-op except we took some care to properly handle the
_SUCCESS file.


On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

I didn¹t get any response. It¹d be really appreciated if anyone using a
special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim m...@palantir.com
Date: Monday, February 16, 2015 at 1:15 AM
To: user@spark.apache.org user@spark.apache.org
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCommitter used by RDD, which is FileOutputCommitter,
seems to require moving files at the commit step, which is not a
constant operation in S3, as discussed in
https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
che.org_mod-5Fmbox_spark

Incorrect number of records after left outer join (I think)

2015-02-19 Thread Darin McBeath
Consider the following left outer join

potentialDailyModificationsRDD = 
reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new 
HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER());


Below are the record counts for the RDDs involved
Number of records for reducedDailyPairRDD: 2565206
Number of records for baselinePairRDD: 56102812
Number of records for potentialDailyModificationsRDD: 2570115

Below are the partitioners for the RDDs involved.
Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400)
Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400)
Partitioner for potentialDailyModificationsRDD: 
Some(org.apache.spark.HashPartitioner@400)


I realize in the above statement that the .partitionBy is probably not needed 
as the underlying RDDs used in the left outer join are already hash partitioned.

My question is how the resulting RDD (potentialDailyModificationsRDD) can end 
up with more records than 
reducedDailyPairRDD.  I would think the number of records in 
potentialDailyModificationsRDD should be 2565206 instead of 2570115.  Am I 
missing something or is this possibly a bug?

I'm using Apache Spark 1.2 on a stand-alone cluster on ec2.  To get the counts 
for the records, I'm using the .count() for the RDD.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Darin McBeath


Thanks Imran.  That's exactly what I needed to know.

Darin.


From: Imran Rashid iras...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Tuesday, February 17, 2015 8:35 PM
Subject: Re: How do you get the partitioner for an RDD in Java?



a JavaRDD is just a wrapper around a normal RDD defined in scala, which is 
stored in the rdd field.  You can access everything that way.  The JavaRDD 
wrappers just provide some interfaces that are a bit easier to work with in 
Java.

If this is at all convincing, here's me demonstrating it inside the spark-shell 
(yes its scala, but I'm using the java api)

scala val jsc = new JavaSparkContext(sc)
jsc: org.apache.spark.api.java.JavaSparkContext = 
org.apache.spark.api.java.JavaSparkContext@7d365529

 
scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c)))
data: org.apache.spark.api.java.JavaRDD[Array[String]] = 
ParallelCollectionRDD[0] at parallelize at console:15

 
scala data.rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None




On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

In an 'early release' of the Learning Spark book, there is the following 
reference:

In Scala and Java, you can determine how an RDD is partitioned using its 
partitioner property (or partitioner() method in Java)

However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a 
way of getting this information.

I'm curious if anyone has any suggestions for how I might go about finding how 
an RDD is partitioned in a Java program.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MapValues and Shuffle Reads

2015-02-17 Thread Darin McBeath
In the following code, I read in a large sequence file from S3 (1TB) spread 
across 1024 partitions.  When I look at the job/stage summary, I see about 
400GB of shuffle writes which seems to make sense as I'm doing a hash partition 
on this file.

// Get the baseline input file
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);

JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

I then execute the following code (with a count to force execution) and what I 
find very strange is that when I look at the job/stage summary, I see more than 
340GB of shuffle read.  Why would there be any shuffle read in this step?  I 
would expect there to be little (if any) shuffle reads in this step.

// Use 'substring' to extract the epoch value from each record.
JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

log.info(Number of baseline records:  + baselinePairRDD.count());

Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

Any insights would be appreciated.

I'm using Spark 1.2.0 in a stand-alone cluster.


Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MapValues and Shuffle Reads

2015-02-17 Thread Darin McBeath
Thanks Imran.

I think you are probably correct. I was a bit surprised that there was no 
shuffle read in the initial hash partition step. I will adjust the code as you 
suggest to prove that is the case. 

I have a slightly different question. If I save an RDD to S3 (or some 
equivalent) and this RDD was hash partitioned at the time, do I still need to 
hash partition the RDD again when I read it in? Is there a way that I could 
prevent all of the shuffling (such as providing a hint)? My parts for the RDD 
will be gzipped so they would not be splittable).  In reality, that's what I 
would really want to do in the first place.

Thanks again for your insights.

Darin.




From: Imran Rashid iras...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Tuesday, February 17, 2015 3:29 PM
Subject: Re: MapValues and Shuffle Reads



Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet, 
what do you mean?  There is no action in that first set, so it won't do 
anything.  By itself, it won't do any shuffle writing, or anything else for 
that matter.

Most likely, the .count() on your second code snippet is actually causing the 
execution of some of the first snippet as well.  The .partitionBy will result 
in both shuffle writes and shuffle reads, but they aren't set in motion until 
the .count further down the line.  Its confusing b/c the stage boundaries don't 
line up exactly with your RDD variables here.  hsfBaselinePairRDD spans 2 
stages, and baselinePairRDD actually gets merged into the stage above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and then 
run the second code snippet afterwards, is it more like what you expect?

Imran




On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

In the following code, I read in a large sequence file from S3 (1TB) spread 
across 1024 partitions.  When I look at the job/stage summary, I see about 
400GB of shuffle writes which seems to make sense as I'm doing a hash partition 
on this file.

// Get the baseline input file
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);

JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

I then execute the following code (with a count to force execution) and what I 
find very strange is that when I look at the job/stage summary, I see more 
than 340GB of shuffle read.  Why would there be any shuffle read in this step? 
 I would expect there to be little (if any) shuffle reads in this step.

// Use 'substring' to extract the epoch value from each record.
JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

log.info(Number of baseline records:  + baselinePairRDD.count());

Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

Any insights would be appreciated.

I'm using Spark 1.2.0 in a stand-alone cluster.


Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Darin McBeath
In an 'early release' of the Learning Spark book, there is the following 
reference:

In Scala and Java, you can determine how an RDD is partitioned using its 
partitioner property (or partitioner() method in Java)

However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way 
of getting this information.

I'm curious if anyone has any suggestions for how I might go about finding how 
an RDD is partitioned in a Java program.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Darin McBeath
I've tried various ideas, but I'm really just shooting in the dark.

I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) 
I'm trying to save off to S3 is approximately 1TB in size (with the partitions 
pretty evenly distributed in size).

I just tried a test to dial back the number of executors on my cluster from 
using the entire cluster (256 cores) down to 128.  Things seemed to get a bit 
farther (maybe) before the wheels started spinning off again.  But, the job 
always fails when all I'm trying to do is save the 1TB file to S3.

I see the following in my master log file.

15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we 
got no heartbeat in 60 seconds
15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 
15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3

For the stage that eventually fails, I see the following summary information.

Summary Metrics for 729 Completed Tasks
Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min 
GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s 

Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB 

So, the max GC was only 5s for 729 completed tasks.  This sounds reasonable.  
As people tend to indicate GC is the reason one loses executors, this does not 
appear to be my case.

Here is a typical snapshot for some completed tasks.  So, you can see that they 
tend to complete in approximately 6 minutes.  So, it takes about 6 minutes to 
write one partition to S3 (a partition being roughly 1 GB)

65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32
5.8 min 0.9 s   344.6 MB 
59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32
6.0 min 0.4 s   324.1 MB 
68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32
5.7 min 0.5 s   329.9 MB 
62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32
5.8 min 0.7 s   326.4 MB 
61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32
5.5 min 1 s 335.7 MB 
64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32
5.6 min 2 s 328.1 MB 

Then towards the end, when things start heading south, I see the following.  
These tasks never complete but you can see that they have taken more than 47 
minutes (so far) before the job finally fails.  Not really sure why.

671 24225   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
672 24226   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
673 24227   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
674 24228   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
675 24229   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
676 24230   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
677 24231   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
678 24232   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
679 24233   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
680 24234   0   RUNNING ANY 1 /  2015/01/23 18:59:17
47 min 
681 24235   0   RUNNING ANY 1 /  2015/01/23 18:59:18
47 min 
682 24236   0   RUNNING ANY 1 /  2015/01/23 18:59:18
47 min 
683 24237   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
684 24238   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
685 24239   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
686 24240   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
687 24241   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
688 24242   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
689 24243   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
690 24244   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
691 24245   0   RUNNING ANY 5 /  2015/01/23 18:59:21
47 min 

What's odd is that even on the same machine (see below) some tasks are still 
completing (in less than 5 minutes) while other tasks on the same machine seem 
to be hung after 46 minutes.  Keep in mind all I'm doing is saving the file to 
S3 so one would think the amount of work per task/partition would be fairly 
equal.

694 24248   0   SUCCESS ANY 0 /  2015/01/23 18:59:32
4.5 min 0.3 s   326.5 MB 
695 24249   0   SUCCESS ANY 0 /  2015/01/23 18:59:32
4.5 min 0.3 s   330.8 MB 
696 24250   0   RUNNING ANY 0 /  2015/01/23 18:59:32
46 min 
697 24251   

Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Darin McBeath
Thanks for the ideas Sven.

I'm using stand-alone cluster (Spark 1.2).
FWIW, I was able to get this running (just now).  This is the first time it's 
worked in probably my last 10 attempts.

In addition to limiting the executors to only 50% of the cluster.  In the 
settings below, I additionally added/changed  the following.  Maybe, I just got 
lucky (although I think not).  Would be good if someone could weigh in and 
agree that these changes are sensible.  I'm also hoping the support for 
placement groups (targeted for 1.3 in the ec2 scripts) will help the situation. 
 All in all, it takes about 45 minutes to write a 1 TB file back to S3 (as 1024 
partitions).


SparkConf conf = new SparkConf()
.setAppName(SparkSync Application)
.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
.set(spark.rdd.compress,true) 
.set(spark.core.connection.ack.wait.timeout,600) 
.set(spark.akka.timeout,600)// Increased from 300
.set(spark.akka.threads,16) // Added so that default was increased 
from 4 to 16
.set(spark.task.maxFailures,64) // Didn't really matter as I had no 
failures in this run
.set(spark.storage.blockManagerSlaveTimeoutMs,30);



From: Sven Krasser kras...@gmail.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Friday, January 23, 2015 5:12 PM
Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file



Hey Darin,

Are you running this over EMR or as a standalone cluster? I've had occasional 
success in similar cases by digging through all executor logs and trying to 
find exceptions that are not caused by the application shutdown (but the logs 
remain my main pain point with Spark).

That aside, another explanation could be S3 throttling you due to volume (and 
hence causing write requests to fail). You can try to split your file into 
multiple pieces and store those as S3 objects with different prefixes to make 
sure they end up in different partitions in S3. See here for details: 
http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html.
 If that works, that'll narrow the cause down.

Best,
-Sven






On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

I've tried various ideas, but I'm really just shooting in the dark.

I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 
partitions) I'm trying to save off to S3 is approximately 1TB in size (with 
the partitions pretty evenly distributed in size).

I just tried a test to dial back the number of executors on my cluster from 
using the entire cluster (256 cores) down to 128.  Things seemed to get a bit 
farther (maybe) before the wheels started spinning off again.  But, the job 
always fails when all I'm trying to do is save the 1TB file to S3.

I see the following in my master log file.

15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because 
we got no heartbeat in 60 seconds
15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on
15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3

For the stage that eventually fails, I see the following summary information.

Summary Metrics for 729 Completed Tasks
Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min
GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s

Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB

So, the max GC was only 5s for 729 completed tasks.  This sounds reasonable.  
As people tend to indicate GC is the reason one loses executors, this does not 
appear to be my case.

Here is a typical snapshot for some completed tasks.  So, you can see that 
they tend to complete in approximately 6 minutes.  So, it takes about 6 
minutes to write one partition to S3 (a partition being roughly 1 GB)

65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32   
 5.8 min 0.9 s   344.6 MB
59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32   
 6.0 min 0.4 s   324.1 MB
68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32   
 5.7 min 0.5 s   329.9 MB
62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32   
 5.8 min 0.7 s   326.4 MB
61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32   
 5.5 min 1 s 335.7 MB
64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32   
 5.6 min 2 s 328.1 MB

Then towards the end, when things start heading south, I see the following.  
These tasks never complete but you can see that they have taken more than 47 
minutes (so far) before the job finally fails.  Not really sure why.

671 24225   0   RUNNING ANY 1 /  2015/01/23 18:59:14   
 47 min
672 24226   0   RUNNING ANY 1 /  2015/01/23 18:59:14   
 47 min
673 24227   0   RUNNING ANY 1 /  2015/01/23 18:59

Confused about shuffle read and shuffle write

2015-01-21 Thread Darin McBeath
 I have the following code in a Spark Job.

// Get the baseline input file(s)
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);
JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(newConvertFromWritableTypes()).partitionBy(newHashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER());

// Use 'substring' to extract epoch values.
JavaPairRDDString, Long baselinePairRDD = 
hsfBaselinePairRDD.mapValues(newExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER());

When looking at the STAGE information for my job, I notice the following:

To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of 
input (from S3) and 377GB of shuffle write (presumably because of the hash 
partitioning).  This all makes sense.

To construct the baselinePairRDD, it also takes about 7.5 minutes.  I thought 
that was a bit odd.  But what I thought was really odd is why there was also 
330GB of shuffle read in this stage.  I would have thought there should be 0 
shuffle read in this stage.  

What I'm confused about is why there is even any 'shuffle read' when 
constructing the baselinePairRDD.  If anyone could shed some light on this it 
would be appreciated.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Confused about shuffle read and shuffle write

2015-01-20 Thread Darin McBeath
I have the following code in a Spark Job.
 // Get the baseline input file(s) JavaPairRDDText,Text 
hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, 
SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, 
String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER());
 
 // Use 'substring' to extract epoch values. JavaPairRDDString, Long 
baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER());
When looking at the STAGE information for my job, I notice the following:
To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of 
input (from S3) and 377GB of shuffle write (presumably because of the hash 
partitioning).  This all makes sense.
To construct the baselinePairRDD, it also takes about 7.5 minutes.  I thought 
that was a bit odd.  But what I thought was really odd is why there was also 
330GB of shuffle read in this stage.  I would have thought there should be 0 
shuffle read in this stage.  
What I'm confused about is why there is even any 'shuffle read' when 
constructing the baselinePairRDD.  If anyone could shed some light on this it 
would be appreciated.
Thanks.
Darin.

Re: Please help me get started on Apache Spark

2014-11-20 Thread Darin McBeath
Take a look at the O'Reilly Learning Spark (Early Release) book.  I've found 
this very useful.
Darin.
  From: Saurabh Agrawal saurabh.agra...@markit.com
 To: user@spark.apache.org user@spark.apache.org 
 Sent: Thursday, November 20, 2014 9:04 AM
 Subject: Please help me get started on Apache Spark
   
 !--#yiv9027708365 _filtered #yiv9027708365 {font-family:Calibri;panose-1:2 15 
5 2 2 2 4 3 2 4;}#yiv9027708365 #yiv9027708365 p.yiv9027708365MsoNormal, 
#yiv9027708365 li.yiv9027708365MsoNormal, #yiv9027708365 
div.yiv9027708365MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri, 
sans-serif;}#yiv9027708365 a:link, #yiv9027708365 
span.yiv9027708365MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv9027708365 a:visited, #yiv9027708365 
span.yiv9027708365MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv9027708365 
p.yiv9027708365MsoListParagraph, #yiv9027708365 
li.yiv9027708365MsoListParagraph, #yiv9027708365 
div.yiv9027708365MsoListParagraph 
{margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri,
 sans-serif;}#yiv9027708365 span.yiv9027708365EmailStyle17 
{font-family:Calibri, sans-serif;color:windowtext;}#yiv9027708365 
.yiv9027708365MsoChpDefault {font-family:Calibri, sans-serif;} _filtered 
#yiv9027708365 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv9027708365 
div.yiv9027708365WordSection1 {}#yiv9027708365 _filtered #yiv9027708365 {} 
_filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered 
#yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} 
_filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered 
#yiv9027708365 {} _filtered #yiv9027708365 {}#yiv9027708365 ol 
{margin-bottom:0in;}#yiv9027708365 ul {margin-bottom:0in;}--   Friends,     I 
am pretty new to Spark as much as to Scala, MLib and the entire Hadoop stack!! 
It would be so much help if I could be pointed to some good books on Spark and 
MLib?    Further, does MLib support any algorithms for B2B cross sell/ upsell 
or customer retention (out of the box preferably) that I could run on my Sales 
force data? I am currently using Collaborative filtering but that’s essentially 
B2C.    Thanks in advance!!    Regards, Saurabh Agrawal 
This e-mail, including accompanying communications and attachments, is strictly 
confidential and only for the intended recipient. Any retention, use or 
disclosure not expressly authorised by Markit is prohibited. This email is 
subject to all waivers and other terms at the following link: 
http://www.markit.com/en/about/legal/email-disclaimer.page

Please visit http://www.markit.com/en/about/contact/contact-us.page? for 
contact information on our offices worldwide.

MarkitSERV Limited has its registered office located at Level 4, Ropemaker 
Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by 
the Financial Conduct Authority with registration number 207294


  

Confused why I'm losing workers/executors when writing a large file to S3

2014-11-13 Thread Darin McBeath
For one of my Spark jobs, my workers/executors are dying and leaving the 
cluster.

On the master, I see something like the following in the log file.  I'm 
surprised to see the '60' seconds in the master log below because I explicitly 
set it to '600' (or so I thought) in my spark job (see below).   This is 
happening at the end of my job when I'm trying to persist a large RDD (probably 
around 300+GB) back to S3 (in 256 partitions).  My cluster consists of 6 
r3.8xlarge machines.  The job successfully works when I'm outputting 100GB or 
200GB.

If  you have any thoughts/insights, it would be appreciated. 

Thanks.

Darin.

Here is where I'm setting the 'timeout' in my spark job.
SparkConf conf = new SparkConf().setAppName(SparkSync 
Application).set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer).set(spark.rdd.compress,true)  
 .set(spark.core.connection.ack.wait.timeout,600);​
On the master, I see the following in the log file.

4/11/13 17:20:39 WARN master.Master: Removing 
worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no 
heartbeat in 60 seconds14/11/13 17:20:39 INFO master.Master: Removing worker 
worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on 
ip-10-35-184-232.ec2.internal:5187714/11/13 17:20:39 INFO master.Master: 
Telling app of lost executor: 2

On a worker, I see something like the following in the log file.

14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 
attemptsjava.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]  at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)  at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)  at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)  at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)  at 
org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)  at 
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)14/11/13 
17:21:11 INFO httpclient.HttpMethodDirector: I/O exception 
(java.net.SocketException) caught when processing request: Broken pipe14/11/13 
17:21:11 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:32 
INFO httpclient.HttpMethodDirector: I/O exception (java.net.SocketException) 
caught when processing request: Broken pipe14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when 
processing request: Resetting to invalid mark14/11/13 17:21:34 INFO 
httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:58 WARN 
utils.RestUtils: Retried connection 6 times, which exceeds the maximum retry 
count of 514/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, 
which exceeds the maximum retry count of 514/11/13 17:22:57 WARN 
util.AkkaUtils: Error sending message in 1 
attemptsjava.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]


ec2 script and SPARK_LOCAL_DIRS not created

2014-11-12 Thread Darin McBeath
I'm using spark 1.1 and the provided ec2 scripts to start my cluster 
(r3.8xlarge machines).  From the spark-shell, I can verify that the environment 
variables are set
scala System.getenv(SPARK_LOCAL_DIRS)res0: String = /mnt/spark,/mnt2/spark
However, when I look on the workers, the directories for /mnt/spark and 
/mnt2/spark do not exist.
Am I missing something?  Has anyone else noticed this?
A colleague was started a cluster (using the ec2 scripts) but for m3.xlarge 
machines and both /mnt/spark and /mnt2/spark directories were created.
Thanks.
Darin.



What should be the number of partitions after a union and a subtractByKey

2014-11-11 Thread Darin McBeath
Assume the following where both updatePairRDD and deletePairRDD are both 
HashPartitioned.  Before the union, each one of these has 512 partitions.   The 
new created updateDeletePairRDD has 1024 partitions.  Is this the 
general/expected behavior for a union (the number of partitions to double)?
JavaPairRDDString,String updateDeletePairRDD = 
updatePairRDD.union(deletePairRDD);
Then a similar question for subtractByKey.  In the example below, 
baselinePairRDD is HashPartitioned (with 512 partitions).  We know from above 
that updateDeletePairRDD has 1024 partitions.  The newly created 
workSubtractBaselinePairRDD has 512 partitions.  This makes sense because we 
are only 'subtracting' records from the baselinePairRDD and one wouldn't think 
the number of partitions would increase.  Is this the general/expected behavior 
for a subractByKey?

JavaPairRDDString,String workSubtractBaselinePairRDD = 
baselinePairRDD.subtractByKey(updateDeletePairRDD);



Question about RDD Union and SubtractByKey

2014-11-10 Thread Darin McBeath
I have the following code where I'm using RDD 'union' and 'subtractByKey' to 
create a new baseline RDD.  All of my RDDs are a key pair with the 'key' a 
String and the 'value' a String (xml document).
// **// Merge the daily 
deletes/updates/adds with the baseline// 
** // Concat the Updates, 
Deletes into one PairRDDJavaPairRDDString,String updateDeletePairRDD = 
updatePairRDD.union(deletePairRDD); // Remove the update/delete  keys from the 
baselineJavaPairRDDString,String workSubtractBaselinePairRDD = 
baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the 
AddsJavaPairRDDString,String workAddBaselinePairRDD = 
workSubtractBaselinePairRDD.union(addPairRDD);

// Add in the UpdatesJavaPairRDDString,String newBaselinePairRDD = 
workAddBaselinePairRDD.union(updatePairRDD);
When I go to 'count' the newBaselinePairRDD 
// Output count for new baseline log.info(Number of new baseline records:  + 
newBaselinePairRDD.count());
I'm getting the following exception (the above log.info is SparkSync.java:785). 
 What I find odd is the reference to spark sql.  So, I'm curious as to whether 
under the covers the RDD union and/or subtractByKey are implemented as spark 
sql. I wouldn't think so but thought I would ask.  I'm also suspicious to the 
reference to the '' and whether that is because of my xml document in the 
value portion of the key pair.  Any insights would be appreciated.  If there 
are thoughts for how to better approach my problem (even debugging), I would be 
interested in that as well.  The updatePairRDD, deletePairRDD, baselinePairRDD, 
addPairRDD, and updateDeletePairRDD are all 'hashPartitioned'.
It's also a bit difficult to trace things because my application is a 'java' 
application and the stack references a lot of scala and very few references to 
my application other than one (SparkSync.java:785).  My application is using 
Spark SQL for some other tasks so perhaps an RDD (part) is being re-calculated 
and is resulting in this error.  But, based on other logging statements 
throughout my application, I don't believe this is the case.
Thanks.
Darin.
14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at 
SparkSync.java:78514/11/10 22:35:27 WARN scheduler.TaskSetManager: Lost task 
0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal): 
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('' (code 
60)): expected a valid value (number, String, array, object, 'true', 'false' or 
'null') at [Source: java.io.StringReader@e8f759e; line: 1, column: 2]        
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)     
   
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557)
        
com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475)
        
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415)
        
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679)
        
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024)
        
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971)
        
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)   
     
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275)
        
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62)
        
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50)
        
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)        
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)        
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)        
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)        
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)        
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)        
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)      
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)    
    org.apache.spark.scheduler.Task.run(Task.scala:54)        

Cincinnati, OH Meetup for Apache Spark

2014-11-03 Thread Darin McBeath
Let me know if you  are interested in participating in a meet up in Cincinnati, 
OH to discuss Apache Spark.
We currently have 4-5 different companies expressing interest but would like a 
few more.
Darin.

XML Utilities for Apache Spark

2014-10-29 Thread Darin McBeath
I developed the spark-xml-utils library because we have a large amount of XML 
in big datasets and I felt this data could be better served by providing some 
helpful xml utilities. This includes the ability to filter documents based on 
an xpath/xquery expression, return specific nodes for an xpath/xquery 
expression, or transform documents using an xquery or an xslt stylesheet. By 
providing some basic wrappers to Saxon-HE, the spark-xml-utils library exposes 
some basic xpath, xslt, and xquery functionality that can readily be leveraged 
by any Spark application (including the spark-shell).  We want to share this 
library with the community and are making it available under the Apache 2.0 
license.
For point of reference, I was able to parse and apply a fairly complex xpath 
expression against 2 million documents (130GB total and 75KB/doc average) in 
less than 3 minute on an AWS cluster (at spot price) costing less than $1/hr.  
When I have a chance, I will blog/write about some of my other investigations 
when using spark-xml-utils.
More about the project is available on 
github(https://github.com/elsevierlabs/spark-xml-utils).  There are examples 
for usage from the spark-shell as well as from a Java application.  Feel free 
to use, contribute, and/or let us know how this library can be improved.  Let 
me know if you have any questions.
Darin.


Spark SQL and confused about number of partitions/tasks to do a simple join.

2014-10-29 Thread Darin McBeath
I have a SchemaRDD with 100 records in 1 partition.  We'll call this baseline.
I have a SchemaRDD with 11 records in 1 partition.  We'll call this daily.
After a fairly basic join of these two tables
JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, 
daily.version FROM baseline, daily  WHERE key=id AND action='u' AND daily.epoch 
 baseline.epoch).cache();

I get a new SchemaRDD results with only 6 records (and the RDD has 200 
partitions).  When the job runs, I can see that 200 tasks were used to do this 
join.  Does this make sense? I'm currently not doing anything special along the 
lines of partitioning (such as hash).  Even if 200 tasks would have been 
required, since the result is only 6 (shouldn't some of these empty partitions 
been 'deleted').
I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]).
Any insight would be appreciated.
Thanks.
Darin.



Re: Spark SQL and confused about number of partitions/tasks to do a simple join.

2014-10-29 Thread Darin McBeath
ok. after reading some documentation, it would appear the issue is the default 
number of partitions for a join (200).
After doing something like the following, I was able to change the value.

  From: Darin McBeath ddmcbe...@yahoo.com.INVALID
 To: User user@spark.apache.org 
 Sent: Wednesday, October 29, 2014 1:55 PM
 Subject: Spark SQL and confused about number of partitions/tasks to do a 
simple join.
   
I have a SchemaRDD with 100 records in 1 partition.  We'll call this baseline.
I have a SchemaRDD with 11 records in 1 partition.  We'll call this daily.
After a fairly basic join of these two tables
JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, 
daily.version FROM baseline, daily  WHERE key=id AND action='u' AND daily.epoch 
 baseline.epoch).cache();

I get a new SchemaRDD results with only 6 records (and the RDD has 200 
partitions).  When the job runs, I can see that 200 tasks were used to do this 
join.  Does this make sense? I'm currently not doing anything special along the 
lines of partitioning (such as hash).  Even if 200 tasks would have been 
required, since the result is only 6 (shouldn't some of these empty partitions 
been 'deleted').
I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]).
Any insight would be appreciated.
Thanks.
Darin.



  

Re: Spark SQL and confused about number of partitions/tasks to do a simple join.

2014-10-29 Thread Darin McBeath
Sorry, hit the send key a bitt too early.
Anyway, this is the code I set.
sqlContext.sql(set spark.sql.shuffle.partitions=10);
  From: Darin McBeath ddmcbe...@yahoo.com
 To: Darin McBeath ddmcbe...@yahoo.com; User user@spark.apache.org 
 Sent: Wednesday, October 29, 2014 2:47 PM
 Subject: Re: Spark SQL and confused about number of partitions/tasks to do a 
simple join.
   
ok. after reading some documentation, it would appear the issue is the default 
number of partitions for a join (200).
After doing something like the following, I was able to change the value.

 

 From: Darin McBeath ddmcbe...@yahoo.com.INVALID
 To: User user@spark.apache.org 
 Sent: Wednesday, October 29, 2014 1:55 PM
 Subject: Spark SQL and confused about number of partitions/tasks to do a 
simple join.
   
I have a SchemaRDD with 100 records in 1 partition.  We'll call this baseline.
I have a SchemaRDD with 11 records in 1 partition.  We'll call this daily.
After a fairly basic join of these two tables
JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, 
daily.version FROM baseline, daily  WHERE key=id AND action='u' AND daily.epoch 
 baseline.epoch).cache();

I get a new SchemaRDD results with only 6 records (and the RDD has 200 
partitions).  When the job runs, I can see that 200 tasks were used to do this 
join.  Does this make sense? I'm currently not doing anything special along the 
lines of partitioning (such as hash).  Even if 200 tasks would have been 
required, since the result is only 6 (shouldn't some of these empty partitions 
been 'deleted').
I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]).
Any insight would be appreciated.
Thanks.
Darin.



   

  

what's the best way to initialize an executor?

2014-10-23 Thread Darin McBeath
I have some code that I only need to be executed once per executor in my spark 
application.  My current approach is to do something like the following:
scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats, 
Namespaces/NamespaceContext))
So, If I understand correctly, the XPathProcessor.init will be called once per 
partition.  Since I have 48 partitions for this RDD and 2 million documents, 
that seems acceptable.  The downside is that I likely will have fewer executors 
than 48 (each executor will handle more than 1 partition) so the executor would 
be called more than once with XPathProcessor.init.  I have code in place to 
make sure this is not an issue.  But, I was wondering if there is a better way 
to accomplish something like this.
Thanks.
Darin.



confused about memory usage in spark

2014-10-22 Thread Darin McBeath
I have a PairRDD of type String,String which I persist to S3 (using the 
following code).
JavaPairRDDText, Text aRDDWritable = aRDD.mapToPair(new 
ConvertToWritableTypes());aRDDWritable.saveAsHadoopFile(outputFile, Text.class, 
Text.class, SequenceFileOutputFormat.class);
class ConvertToWritableTypes implements PairFunctionTuple2String, String, 
Text, Text {  public Tuple2Text, Text call(Tuple2String, String record) {  
return new Tuple2(new Text(record._1), new Text(record._2));    } }
When I look at the S3 reported size for say one of the parts (part-0) it 
indicates the size is 156MB.

I then bring up a spark-shell and load this part-0 and cache it. 
scala val keyPair = 
sc.sequenceFile[String,String](s3n://somebucket/part-0).cache()

After execution an action for the above RDD to force the cache, I look at the 
storage (using the Application UI) and it show that I'm using 297MB for this 
RDD (when it was only 156MB in S3).  I get that there could be some differences 
between the serialized storage format and what is then used in memory, but I'm 
curious as to whether I'm missing something and/or should be doing things 
differently.
Thanks.
Darin.

Disabling log4j in Spark-Shell on ec2 stopped working on Wednesday (Oct 8)

2014-10-10 Thread Darin McBeath
For weeks, I've been using the following trick to successfully disable log4j in 
the spark-shell when running a cluster on ec2 that was started by the Spark 
provided ec2 scripts.

cp ./conf/log4j.properties.template ./conf/log4j.properties


I then change log4j.rootCategory=INFO to log4j.rootCategory=WARN.

This all stopped working on Wednesday when I could no longer successfully start 
a cluster on ec2 (using the Spark provided ec2 scripts).  I noticed the 
resolution to this problem was a script referenced by the ec2 scripts had been 
changed (and that this referenced script has since been reverted).  I raise 
this as I don't know if this is a symptom of my problem and that it's 
interesting the problems started happening at the same time.

When I now start up the cluster on ec2 and subsequently start the spark-shell I 
can no longer disable the log4j messages using the above trick.  I'm using 
Apache Spark 1.1.0.

What's interesting is that I can start the cluster locally on my laptop (using 
Spark 1.1.0) and the above trick for disabling log4j in the spark-shell works.  
So, the issue appears to be related to ec2 and potentially something referenced 
by the Spark provided ec2 startup script.  But, that is purely a guess on my 
part.

I'm wondering if anyone else has noticed this issue and if so has a workaround.

Thanks.

Darin.


How to pass env variables from master to executors within spark-shell

2014-08-20 Thread Darin McBeath
Can't seem to figure this out.  I've tried several different approaches without 
success. For example, I've tried setting spark.executor.extraJavaOptions in the 
spark-default.conf (prior to starting the spark-shell) but this seems to have 
no effect.

Outside of spark-shell (within a java application I wrote), I successfully do 
the following:

// Set environment variables for the executors
conf.setExecutorEnv(AWS_ACCESS_KEY_ID, System.getenv(AWS_ACCESS_KEY_ID));
conf.setExecutorEnv(AWS_SECRET_ACCESS_KEY, 
System.getenv(AWS_SECRET_ACCESS_KEY));


But, because my SparkContext already exists within spark-shell, this really 
isn't an option (unless I'm missing something).  

Thanks.

Darin.

Issues with S3 client library and Apache Spark

2014-08-15 Thread Darin McBeath
I've seen a couple of issues posted about this, but I never saw a resolution.

When I'm using Spark 1.0.2 (and the spark-submit script to submit my jobs) and 
AWS SDK 1.8.7, I get the stack trace below.  However, if I drop back to AWS SDK 
1.3.26 (or anything from the AWS SDK 1.4.* family) then everything works fine.  
It would appear that after AWS SDK 1.4, there became a dependency on HTTP 
Client 4.2 (instead of 4.1).  I would like to use the more recent versions of 
the AWS SDK (and not use something nearly 2 years old) so I'm curious whether 
anyone has figured out a workaround to this problem.

Thanks.

Darin.

java.lang.NoSuchMethodError: 
org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99)
at 
com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29)
at 
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:97)
at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:181)
at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119)
at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:408)
at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:390)
at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:374)
at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:313)
at com.elsevier.s3.SimpleStorageService.clinit(SimpleStorageService.java:27)
at com.elsevier.spark.XMLKeyPair.call(SDKeyMapKeyPairRDD.java:75)
at com.elsevier.spark.XMLKeyPair.call(SDKeyMapKeyPairRDD.java:65)
at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:779)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:769)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)

Should the memory of worker nodes be constrained to the size of the master node?

2014-08-14 Thread Darin McBeath
I started up a cluster on EC2 (using the provided scripts) and specified a 
different instance type for the master and the the worker nodes.  The cluster 
started fine, but when I looked at the cluster (via port 8080), it showed that 
the amount of memory available to the worker nodes did not match the instance 
type I had specified.  Instead, the amount of memory for the worker nodes 
matched the master node.  I did verify that the correct instance types had been 
started for the master and worker nodes.

Curious as to whether this is expected behavior or if this might be a bug?

Thanks.

Darin.

Is there any interest in handling XML within Spark ?

2014-08-13 Thread Darin McBeath
I've been playing around with Spark off and on for the past month and have 
developed some XML helper utilities that enable me to filter an XML dataset as 
well as transform an XML dataset (we have a lot of XML content).  I'm posting 
this email to see if there would be any interest in this effort (as I would be 
happy to place the code in a public git repo) and/or to see if there is already 
something in place that already provides this capability (so I'm not wasting my 
time).  Under the covers, I'm leverage Saxon-HE. I'll first discuss the 
'filtering' aspect.

Assuming you have already created a PairRDD (with the key being the identifier 
for the XML document, and the value being the actual XML document), you could 
easily do the following from the spark-shell to filter this Pair RDD based on 
an arbitrary XPath expression.

## Start the spark-shell (and copy the jar file to executors)
root@ip-10-233-73-204 spark]$ ./bin/spark-shell --jars 
lib/uber-SparkUtils-0.1.jar

## Bring in the sequence file (2 million records)
scala val xmlKeyPair = 
sc.sequenceFile[String,String](s3n://darin/xml/part*).cache()

## Test values against an xpath expression (need to import the the class from 
my jar)
scala import com.darin.xml.XPathEvaluation
scala val resultsRDD = xmlKeyPair.filter{case(k,v)  = 
XPathEvaluation.evaluateString(v, /doc/meta[year='2010'])}

## Save the results as a hadoop sequence file 
scala resultsRDD.saveAsSequenceFile(s3n:/darin/xml/results)

## Do more xpath expressions to create more filtered datasets, etc.

In my case, the initial PairRDD is about 130GB.  With 2 million documents, this 
implies an average of around 65KB per document.  On a small 3 node AWS cluster 
(m3.2xlarge) the above will execute in around 10 minutes.   I currently use 
spot instances (.08/hr each) so this is very economical.

More complex XPath expressions could be used.  

Assume a sample record structure of the following

person gender=male
age32/age
hobbies
hobbytennis/hobby
hobbygolf/hobby
hobbyprogramming/hobby
/hobbies
name
given-nameDarin/given-name
surnameMcBeath/surname
/name
address
street8000 Park Lake Dr/street
cityMason/city
stateOhio/state
/address
/person
The following XPath expressions could be used.

// Exact match where the surname equals 'McBeath'
exists(/person/name[surname='McBeath'])

// Exact match where the person gender attribute equals 'male'
exists(/person[@gender='male'])
// Where the person age is between 30 and 40
exists(/person[(xs:integer(age) = 30) and (xs:integer(age) = 40)])
// Exact match (after lower-case conversion) where the surname equals 'mcbeath'
exists(/person/name[lower-case(string-join(surname/text(),' '))='mcbeath'])
// Exact match (after lower-case conversion) where within a name a surname 
equals 'mcbeath' and given-name equals 'darin'
exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and 
lower-case(string-join(given-name/text(),' '))='darin']])
// Exact match (after lower-case conversion) where within a name a surname 
equals 'mcbeath' and given-name equals 'darin' or 'darby'
exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and 
lower-case(string-join(given-name/text(),' '))=('darin','darby')]])
// Search/Token match (after lower-case conversion) where an immediate text 
node(s) of street contains the token 'lake'
exists(/person/address[tokenize(lower-case(string-join(street/text(),' 
')),'\\W+') = 'lake'])
// Search/Token match (after lower-case conversion) where any text node 
descendant of person contains the token 'lake'
exists(/person[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = 
'lake']))); 

// Search/Token 'wildcard' match (after lower-case conversion) where an 
immediate text node(s) of street contains the token matching the wildcard 
expression 'pa*'
exists(/person/address[(for $i in 
tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return 
matches($i,'pa.+')) = true()])


// Search/Token 'wildcard' match (after lower-case conversion) where an 
immediate text node(s) of street contains the token matching the wildcard 
expression 'pa?k'
exists(/person/address[(for $i in 
tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return 
matches($i,'pa.?k')) = true()])
// Exact match were first hobby is 'tennis' 
exists(/person/hobbies/hobby[position() = 1 and ./text() = 'tennis'])

// Exact match were first or second hobby is 'tennis' 
exists(/person/hobbies/hobby[position() = 2 and ./text() = 'tennis'])
// Exact match where the state does not equal 'Ohio'
not(exists(//state[.='Ohio']))
// Search/Token match (after lower-case conversion) where any text node 
descendant of person contains the phrase 'park lake'
exists(/person[matches(string-join(tokenize(lower-case(string-join(.//text(),' 
')),'\\W+'),' '), 'park lake')]))); 


This is very much work in progress.  But, I'm curious as to whether there is 
interest in the Spark community for something like this.  I have also done 
something 

Re: Number of partitions and Number of concurrent tasks

2014-07-31 Thread Darin McBeath
Ok, I set the number of spark worker instances to 2 (below is my startup 
command).  But, this essentially had the effect of increasing my number of 
workers from 3 to 6 (which was good) but it also reduced my number of cores per 
worker from 8 to 4 (which was not so good).  In the end, I would still only be 
able to concurrently process 24 partitions in parallel.  I'm starting a 
stand-alone cluster using the spark provided ec2 scripts .  I tried setting the 
env variable for SPARK_WORKER_CORES in the spark_ec2.py but this had no effect. 
So, it's not clear if I could even set the SPARK_WORKER_CORES with the ec2 
scripts.  Anyway, not sure if there is anything else I can try but at least 
wanted to document what I did try and the net effect.  I'm open to any 
suggestions/advice.

 ./spark-ec2 -k key -i key.pem --hadoop-major-version=2 launch -s 3 -t 
m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2 
my-cluster




 From: Daniel Siegmann daniel.siegm...@velos.io
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org 
user@spark.apache.org 
Sent: Thursday, July 31, 2014 10:04 AM
Subject: Re: Number of partitions and Number of concurrent tasks
 


I haven't configured this myself. I'd start with setting SPARK_WORKER_CORES to 
a higher value, since that's a bit simpler than adding more workers. This 
defaults to all available cores according to the documentation, so I'm not 
sure if you can actually set it higher. If not, you can get around this by 
adding more worker instances; I believe simply setting SPARK_WORKER_INSTANCES 
to 2 would be sufficient.

I don't think you have to set the cores if you have more workers - it will 
default to 8 cores per worker (in your case). But maybe 16 cores per node will 
be too many. You'll have to test. Keep in mind that more workers means more 
memory and such too, so you may need to tweak some other settings downward in 
this case.

On a side note: I've read some people found performance was better when they 
had more workers with less memory each, instead of a single worker with tons of 
memory, because it cut down on garbage collection time. But I can't speak to 
that myself.

In any case, if you increase the number of cores available in your cluster 
(whether per worker, or adding more workers per node, or of course adding more 
nodes) you should see more tasks running concurrently. Whether this will 
actually be faster probably depends mainly on whether the CPUs in your nodes 
were really being fully utilized with the current number of cores.




On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

Thanks.


So to make sure I understand.  Since I'm using a 'stand-alone' cluster, I 
would set SPARK_WORKER_INSTANCES to something like 2 (instead of the default 
value of 1).  Is that correct?  But, it also sounds like I need to explicitly 
set a value for SPARKER_WORKER_CORES (based on what the documentation states). 
 What would I want that value to be based on my configuration below?  Or, 
would I leave that alone?




 From: Daniel Siegmann daniel.siegm...@velos.io
To: user@spark.apache.org; Darin McBeath ddmcbe...@yahoo.com 
Sent: Wednesday, July 30, 2014 5:58 PM
Subject: Re: Number of partitions and Number of concurrent tasks
 


This is correct behavior. Each core can execute exactly one task at a time, 
with each task corresponding to a partition. If your cluster only has 24 
cores, you can only run at most 24 tasks at once.

You could run multiple workers per node to get more executors. That would give 
you more cores in the cluster. But however many cores you have, each core will 
run only one task at a time.




On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.


I have an RDDString which I've repartitioned so it has 100 partitions 
(hoping to increase the parallelism).


When I do a transformation (such as filter) on this RDD, I can't  seem to get 
more than 24 tasks (my total number of cores across the 3 nodes) going at one 
point in time.  By tasks, I mean the number of tasks that appear under the 
Application UI.  I tried explicitly setting the spark.default.parallelism to 
48 (hoping I would get 48 tasks concurrently running) and verified this in 
the Application UI for the running application but this had no effect.  
Perhaps, this is ignored for a 'filter' and the default is the total number 
of cores available.


I'm fairly new with Spark so maybe I'm just missing or misunderstanding 
something fundamental.  Any help would be appreciated.


Thanks.


Darin.




-- 

Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning


440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.iow: www.velos.io




-- 

Daniel Siegmann, Software Developer
Velos
Accelerating Machine

Number of partitions and Number of concurrent tasks

2014-07-30 Thread Darin McBeath
I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.

I have an RDDString which I've repartitioned so it has 100 partitions (hoping 
to increase the parallelism).

When I do a transformation (such as filter) on this RDD, I can't  seem to get 
more than 24 tasks (my total number of cores across the 3 nodes) going at one 
point in time.  By tasks, I mean the number of tasks that appear under the 
Application UI.  I tried explicitly setting the spark.default.parallelism to 48 
(hoping I would get 48 tasks concurrently running) and verified this in the 
Application UI for the running application but this had no effect.  Perhaps, 
this is ignored for a 'filter' and the default is the total number of cores 
available.

I'm fairly new with Spark so maybe I'm just missing or misunderstanding 
something fundamental.  Any help would be appreciated.

Thanks.

Darin.


Re: Number of partitions and Number of concurrent tasks

2014-07-30 Thread Darin McBeath
Thanks.

So to make sure I understand.  Since I'm using a 'stand-alone' cluster, I would 
set SPARK_WORKER_INSTANCES to something like 2 (instead of the default value of 
1).  Is that correct?  But, it also sounds like I need to explicitly set a 
value for SPARKER_WORKER_CORES (based on what the documentation states).  What 
would I want that value to be based on my configuration below?  Or, would I 
leave that alone?



 From: Daniel Siegmann daniel.siegm...@velos.io
To: user@spark.apache.org; Darin McBeath ddmcbe...@yahoo.com 
Sent: Wednesday, July 30, 2014 5:58 PM
Subject: Re: Number of partitions and Number of concurrent tasks
 


This is correct behavior. Each core can execute exactly one task at a time, 
with each task corresponding to a partition. If your cluster only has 24 cores, 
you can only run at most 24 tasks at once.

You could run multiple workers per node to get more executors. That would give 
you more cores in the cluster. But however many cores you have, each core will 
run only one task at a time.




On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.


I have an RDDString which I've repartitioned so it has 100 partitions 
(hoping to increase the parallelism).


When I do a transformation (such as filter) on this RDD, I can't  seem to get 
more than 24 tasks (my total number of cores across the 3 nodes) going at one 
point in time.  By tasks, I mean the number of tasks that appear under the 
Application UI.  I tried explicitly setting the spark.default.parallelism to 
48 (hoping I would get 48 tasks concurrently running) and verified this in the 
Application UI for the running application but this had no effect.  Perhaps, 
this is ignored for a 'filter' and the default is the total number of cores 
available.


I'm fairly new with Spark so maybe I'm just missing or misunderstanding 
something fundamental.  Any help would be appreciated.


Thanks.


Darin.




-- 

Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning


440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.iow: www.velos.io