Re: EDI (Electronic Data Interchange) parser on Spark
I'm not familiar with EDI, but perhaps one option might be spark-xml-utils (https://github.com/elsevierlabs-os/spark-xml-utils). You could transform the XML to the XML format required by the xml-to-json function and then return the json. Spark-xml-utils wraps the open source Saxon project and supports XPath, XQuery, and XSLT. Spark-xml-utils doesn't parallelize the parsing of an individual document, but if you have your documents split across a cluster, the processing can be parallelized. We use this package extensively within our company to process millions of XML records. If you happen to be attending Spark summit in a few months, someone will be presenting on this topic (https://databricks.com/session/mining-the-worlds-science-large-scale-data-matching-and-integration-from-xml-corpora). Below is a snippet for xquery. let $retval := {$doi} {$cid} {$pii} {$content-type} {$srctitle} {$document-type} {$document-subtype} {$publication-date} {$article-title} {$issn} {$isbn} {$lang} {$tables} return xml-to-json($retval) Darin. On Tuesday, March 13, 2018, 8:52:42 AM EDT, Aakash Basuwrote: Hi Jörn, Thanks for a quick revert. I already built a EDI to JSON parser from scratch using the 811 and 820 standard mapping document. It can run on any standard and for any type of EDI. But my built is in native python and doesn't leverage Spark's parallel processing, which I want to do for large and huge amount of EDI data. Any pointers on that? Thanks, Aakash. On Tue, Mar 13, 2018 at 3:44 PM, Jörn Franke wrote: Maybe there are commercial ones. You could also some of the open source parser for xml. However xml is very inefficient and you need to du a lot of tricks to make it run in parallel. This also depends on type of edit message etc. sophisticated unit testing and performance testing is key. Nevertheless it is also not as difficult as I made it sound now. > On 13. Mar 2018, at 10:36, Aakash Basu wrote: > > Hi, > > Did anyone built parallel and large scale X12 EDI parser to XML or JSON using > Spark? > > Thanks, > Aakash.
How to find the partitioner for a Dataset
I have a Dataset (om) which I created and repartitioned (and cached) using one of the fields (docId). Reading the Spark documentation, I would assume the om Dataset should be hash partitioned. But, how can I verify this? When I do om.rdd.partitioner I get Option[org.apache.spark.Partitioner] = None I thought I would have seen HashPartitioner. But, perhaps this is not equivalent. The reason I ask is that when I use this cached Dataset in a join with another Dataset (partitioned on the same column and cached) I see things like the following in my explain which makes me think the Dataset might have lost the partitioner. I also see a couple of stages for the job where it seems like each Dataset in my join is being read in and shuffled out again (I'm assuming for the hash partitioning required by the join) Exchange hashpartitioning(_1#6062.docId, 8) Any thoughts/ideas would be appreciated. Thanks. Darin. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-the-partitioner-for-a-Dataset-tp27672.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Datasets and Partitioners
How do you find the partitioner for a Dataset? I have a Dataset (om) which I created and repartitioned using one of the fields (docId). Reading the documentation, I would assume the om Dataset should be hash partitioned. But, how can I verify this? When I do om.rdd.partitioner I get Option[org.apache.spark.Partitioner] = None But, perhaps this is not equivalent. The reason I ask is that when I use this cached Dataset in a join with another Dataset (partitioned on the same column and cached) I see things like the following in my explain which makes me think the Dataset might have lost the partitioner. I also see a couple of stages for the job where it seems like each Dataset in my join is being read in and shuffled out again (I'm assuming for the hash partitioning required by the join) Exchange hashpartitioning(_1#6062.docId, 8) Any thoughts/ideas would be appreciated. Thanks. Darin. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Dataset Filter performance - trying to understand
I've been trying to understand the performance of Datasets (and filters) in Spark 2.0. I have a Dataset which I've read from a parquet file and cached into memory (deser). This is spread across 8 partitions and consumes a total of 826MB of memory on my cluster. I verified that the dataset was 100% cached in memory by looking at the Spark UI. I'm using an AWS c3.2xlarge for my 1 worker (8 cores). There are 108,587,678 total records in my cached dataset (om). I run the following command (against this cached Dataset) and it takes 13.56s. om.filter(textAnnotation => textAnnotation.annotType == "ce:para").count This returns a count of 1,039,993 When I look at the explain() for this query, I see the following: == Physical Plan == *Filter .apply+- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], [.apply] +- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange hashpartitioning(docId#394, 8) ... I was a bit perplexed why this takes so long as I had read that Spark could filter 1B rows a second on a single cpu. Granted, my row is likely more complex but I thought it should be faster than 13+ seconds to read in 100M rows that had been cached into memory. So, I modified the above query to the following: om.filter("annotType == 'ce:para'").count The query now completes in just over 1s (a huge improvement). When I do the explain plan for this query, I see the following: == Physical Plan == *Filter (isnotnull(annotType#396) && (annotType#396 = ce:para)) +- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], [isnotnull(annotType#396), (annotType#396 = ce:para)] +- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange hashpartitioning(docId#394, 8) This is very similar to the first with the notable exception of *Filter (isnotnull(annotType#396) && (annotType#396 = ce:para)) instead of *Filter .apply I'm guessing the improved performance is because the object TextAnnotation must be created in the first example (and not the second). Although, this is not clear from the explain plans. Is that correct? Or is there some other reason why the second approach is significantly faster? I would really like to get a solid understanding for why the performance of the second query is so much faster. I also want to clarify whether the InMemoryTableScan and inMemoryRelation are part of the whole-stage code generation. I'm thinking they aren't as they aren't prefixed by a "*". If not, is there something I could do to make take this part of whole-stage code generation? My goal is to make the above operation as fast as possible. I could of course increase the partitions (and the size of my cluster) but I also want to clarify my understanding of whole-stage code generation. Any thought/suggestions would be appreciated. Also, if anyone has found good resources that further explain the details of the DAG and whole-stage code generation, I would appreciate those as well. Thanks. Darin. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Best way to read XML data from RDD
Yes, you can use it for single line XML or even a multi-line XML. In our typical mode of operation, we have sequence files (where the value is the XML). We then run operations over the XML to extract certain values or to transform the XML into another format (such as json). If i understand your question, your content is in json. Some of the values within this json are XML strings. You should be able to use spark-xml-utils to parse this string and filter/evaluate the result of an xpath expression (or xquery/xslt). One limitation of spark-xml-utils when using the evaluate operation is that it returns a string. So, you have to be a little creative when returning multiple values (such as delimiting the values with a special character and then splitting on this delimiter). Darin. From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> To: Darin McBeath <ddmcbe...@yahoo.com>; Hyukjin Kwon <gurwls...@gmail.com>; Jörn Franke <jornfra...@gmail.com> Cc: Felix Cheung <felixcheun...@hotmail.com>; user <user@spark.apache.org> Sent: Monday, August 22, 2016 6:53 AM Subject: Re: Best way to read XML data from RDD Hi Darin, Ate you using this utility to parse single line XML? Sent from Samsung Mobile. Original message From: Darin McBeath <ddmcbe...@yahoo.com> Date:21/08/2016 17:44 (GMT+05:30) To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung <felixcheun...@hotmail.com>, user <user@spark.apache.org> Subject: Re: Best way to read XML data from RDD Another option would be to look at spark-xml-utils. We use this extensively in the manipulation of our XML content. https://github.com/elsevierlabs-os/spark-xml-utils There are quite a few examples. Depending on your preference (and what you want to do), you could use xpath, xquery, or xslt to transform, extract, or filter. Like mentioned below, you want to initialize the parser in a mapPartitions call (one of the examples shows this). Hope this is helpful. Darin. From: Hyukjin Kwon <gurwls...@gmail.com> To: Jörn Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung <felixcheun...@hotmail.com>; user <user@spark.apache.org> Sent: Sunday, August 21, 2016 6:10 AM Subject: Re: Best way to read XML data from RDD Hi Diwakar, Spark XML library can take RDD as source. ``` val df = new XmlReader() .withRowTag("book") .xmlRdd(sqlContext, rdd) ``` If performance is critical, I would also recommend to take care of creation and destruction of the parser. If the parser is not serializble, then you can do the creation for each partition within mapPartition just like https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325 I hope this is helpful. 2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>: I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? >Otherwise you could just create one XML Parser object / node, but sharing this >among the parallel tasks on the same node is tricky. >The other possibility could be simply more hardware ... > >On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> >wrote: > > >Yes . It accepts a xml file as source but not RDD. The XML data embedded >inside json is streamed from kafka cluster. So I could get it as RDD. >>Right now I am using spark.xml XML.loadstring method inside RDD map >>function but performance wise I am not happy as it takes 4 minutes to >>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. >> >> >> >> >>Sent from Samsung Mobile. >> >> >> Original message >>From: Felix Cheung <felixcheun...@hotmail.com> >>Date:20/08/2016 09:49 (GMT+05:30) >>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> , user >><user@spark.apache.org> >>Cc: >>Subject: Re: Best way to read XML data from RDD >> >> >>Have you tried >> >>https://github.com/databricks/ spark-xml >>? >> >> >> >> >> >>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" >><diwakar.dhanusk...@gmail.com> wrote: >> >> >>Hi, >> >> >>There is a RDD with json data. I could read json data using rdd.read.json . >>The json data has XML data in couple of key-value paris. >> >> >>Which is the best method to read and parse XML from rdd. Is there any >>specific xml libraries for spark. Could anyone help on this. >> >> >>Thanks.
Re: Best way to read XML data from RDD
Another option would be to look at spark-xml-utils. We use this extensively in the manipulation of our XML content. https://github.com/elsevierlabs-os/spark-xml-utils There are quite a few examples. Depending on your preference (and what you want to do), you could use xpath, xquery, or xslt to transform, extract, or filter. Like mentioned below, you want to initialize the parser in a mapPartitions call (one of the examples shows this). Hope this is helpful. Darin. From: Hyukjin KwonTo: Jörn Franke Cc: Diwakar Dhanuskodi ; Felix Cheung ; user Sent: Sunday, August 21, 2016 6:10 AM Subject: Re: Best way to read XML data from RDD Hi Diwakar, Spark XML library can take RDD as source. ``` val df = new XmlReader() .withRowTag("book") .xmlRdd(sqlContext, rdd) ``` If performance is critical, I would also recommend to take care of creation and destruction of the parser. If the parser is not serializble, then you can do the creation for each partition within mapPartition just like https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325 I hope this is helpful. 2016-08-20 15:10 GMT+09:00 Jörn Franke : I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? >Otherwise you could just create one XML Parser object / node, but sharing this >among the parallel tasks on the same node is tricky. >The other possibility could be simply more hardware ... > >On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi >wrote: > > >Yes . It accepts a xml file as source but not RDD. The XML data embedded >inside json is streamed from kafka cluster. So I could get it as RDD. >>Right now I am using spark.xml XML.loadstring method inside RDD map >>function but performance wise I am not happy as it takes 4 minutes to >>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. >> >> >> >> >>Sent from Samsung Mobile. >> >> >> Original message >>From: Felix Cheung >>Date:20/08/2016 09:49 (GMT+05:30) >>To: Diwakar Dhanuskodi , user >> >>Cc: >>Subject: Re: Best way to read XML data from RDD >> >> >>Have you tried >> >>https://github.com/databricks/ spark-xml >>? >> >> >> >> >> >>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" >> wrote: >> >> >>Hi, >> >> >>There is a RDD with json data. I could read json data using rdd.read.json . >>The json data has XML data in couple of key-value paris. >> >> >>Which is the best method to read and parse XML from rdd. Is there any >>specific xml libraries for spark. Could anyone help on this. >> >> >>Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RDD vs Dataset performance
I started playing round with Datasets on Spark 2.0 this morning and I'm surprised by the significant performance difference I'm seeing between an RDD and a Dataset for a very basic example. I've defined a simple case class called AnnotationText that has a handful of fields. I create a Dataset[AnnotationText] with my data and repartition(4) this on one of the columns and cache the resulting dataset as ds (force the cache by executing a count action). Everything looks good and I have more than 10M records in my dataset ds. When I execute the following: ds.filter(textAnnotation => textAnnotation.text == "mutational".toLowerCase).count It consistently finishes in just under 3 seconds. One of the things I notice is that it has 3 stages. The first stage is skipped (as this had to do with creation ds and it was already cached). The second stage appears to do the filtering (requires 4 tasks) but interestingly it shuffles output. The third stage (requires only 1 task) appears to count the results of the shuffle. When I look at the cached dataset (on 4 partitions) it is 82.6MB. I then decided to convert the ds dataset to an RDD as follows, repartition(4) and cache. val aRDD = ds.rdd.repartition(4).cache aRDD.count So, I now have an RDD[AnnotationText] When I execute the following: aRDD.filter(textAnnotation => textAnnotation.text == "mutational".toLowerCase).count It consistently finishes in just under half a second. One of the things I notice is that it only has 2 stages. The first stage is skipped (as this had to do with creation of aRDD and it was already cached). The second stage appears to do the filtering and count(requires 4 tasks). Interestingly, there is no shuffle (or subsequently 3rd stage). When I look at the cached RDD (on 4 partitions) it is 2.9GB. I was surprised how significant the cached storage difference was between the Dataset (82.6MB) and the RDD (2.9GB) version of the same content. Is this kind of difference to be expected? While I like the smaller size for the Dataset version, I was confused as to why the performance for the Dataset version was so much slower (2.5s vs .5s). I suspect it might be attributed to the shuffle and third stage required by the Dataset version but I'm not sure. I was under the impression that Datasets should (would) be faster in many use cases (such as the one I'm using above). Am I doing something wrong or is this to be expected? Thanks. Darin. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark 1.6 foreachPartition only appears to be running on one executor
ok, some more information (and presumably a workaround). when I initial read in my file, I use the following code. JavaRDD keyFileRDD = sc.textFile(keyFile) Looking at the UI, this file has 2 partitions (both on the same executor). I then subsequently repartition this RDD (to 16) partKeyFileRDD = keyFileRDD.repartition(16) Looking again at the UI, this file has 16 partitions now (all on the same executor). When the forEachPartition runs, this then uses these 16 partitions (all on the same executor). I think this is really the problem. I'm not sure why the repartition didn't spread the partitions across both executors. When the mapToPair subsequently runs below both executors are used and things start falling apart because none of the initialization logic was performed on the one executor. However, if I modify the code above JavaRDD keyFileRDD = sc.textFile(keyFile,16) Then initial keyFileRDD will be in 16 partitions spread across both executors. When I execute my forEachPartition directly on keyFileRDD (since there is no need to repartition), both executors will now be used (and initialized). Anyway, don't know if this is my lack of understanding for how repartition should work or if this is a bug. Thanks Jacek for starting to dig into this. Darin. - Original Message - From: Darin McBeath <ddmcbe...@yahoo.com.INVALID> To: Jacek Laskowski <ja...@japila.pl> Cc: user <user@spark.apache.org> Sent: Friday, March 11, 2016 1:57 PM Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor My driver code has the following: // Init S3 (workers) so we can read the assets partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3)); // Get the assets. Create a key pair where the key is asset id and the value is the rec. JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new SimpleStorageServiceAsset()); The worker then has the following. The issue I believe is that the following log.info statements only appear in the log file for one of my executors (and not both). In other words, when executing the forEachPartition above, Spark appears to think all of the partitions are on one executor (at least that is the impression I'm left with). But, when I get to the mapToToPair, Spark suddenly begins to use both executors. I have verified that there are 16 partitions for partKeyFileRDD. public class SimpleStorageServiceInit implements VoidFunction<Iterator> { privateString arg1; private String arg2; private String arg3; public SimpleStorageServiceInit(arg1, String arg2, String arg3) { this.arg1 = arg1; this.arg2= arg2; this.arg3 = arg3; log.info("SimpleStorageServiceInit constructor"); log.info("SimpleStorageServiceInit constructor arg1: "+ arg1); log.info("SimpleStorageServiceInit constructor arg2:"+ arg2); log.info("SimpleStorageServiceInit constructor arg3: "+ arg3); } @Override public void call(Iterator arg) throws Exception { log.info("SimpleStorageServiceInit call"); log.info("SimpleStorageServiceInit call arg1: "+ arg1); log.info("SimpleStorageServiceInit call arg2:"+ arg2); log.info("SimpleStorageServiceInit call arg3: "+ arg3); SimpleStorageService.init(this.arg1, this.arg2, this.arg3); } } From: Jacek Laskowski <ja...@japila.pl> To: Darin McBeath <ddmcbe...@yahoo.com> Cc: user <user@spark.apache.org> Sent: Friday, March 11, 2016 1:40 PM Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor Hi, Could you share the code with foreachPartition? Jacek 11.03.2016 7:33 PM "Darin McBeath" <ddmcbe...@yahoo.com> napisał(a): > >I can verify this by looking at the log file for the workers. > >Since I output logging statements in the object called by the >foreachPartition, I can see the statements being logged. Oddly, these output >statements only occur in one executor (and not the other). It occurs 16 times >in this executor since there are 16 partitions. This seems odd as there are >only 8 cores on the executor and the other executor doesn't appear to be >called at all in the foreachPartition call. But, when I go to do a map >function on this same RDD then things start blowing up on the other executor >as it starts doing work for some partitions (although, it would appear that >all partitions were only initialized on the other executor). The executor that >was used in the foreachPartition call works fine and doesn't experience issue. > But, because the other executor is failing on every request the job dies. > >Darin. > > > >From: Jacek Laskowski <ja...@japila.pl> >To: Darin McBeath <ddmcbe...@yahoo.com> >Cc: user <user@spark.apache.org> >Sent: Friday, Marc
Re: spark 1.6 foreachPartition only appears to be running on one executor
My driver code has the following: // Init S3 (workers) so we can read the assets partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3)); // Get the assets. Create a key pair where the key is asset id and the value is the rec. JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new SimpleStorageServiceAsset()); The worker then has the following. The issue I believe is that the following log.info statements only appear in the log file for one of my executors (and not both). In other words, when executing the forEachPartition above, Spark appears to think all of the partitions are on one executor (at least that is the impression I'm left with). But, when I get to the mapToToPair, Spark suddenly begins to use both executors. I have verified that there are 16 partitions for partKeyFileRDD. public class SimpleStorageServiceInit implements VoidFunction<Iterator> { privateString arg1; private String arg2; private String arg3; public SimpleStorageServiceInit(arg1, String arg2, String arg3) { this.arg1 = arg1; this.arg2= arg2; this.arg3 = arg3; log.info("SimpleStorageServiceInit constructor"); log.info("SimpleStorageServiceInit constructor arg1: "+ arg1); log.info("SimpleStorageServiceInit constructor arg2:"+ arg2); log.info("SimpleStorageServiceInit constructor arg3: "+ arg3); } @Override public void call(Iterator arg) throws Exception { log.info("SimpleStorageServiceInit call"); log.info("SimpleStorageServiceInit call arg1: "+ arg1); log.info("SimpleStorageServiceInit call arg2:"+ arg2); log.info("SimpleStorageServiceInit call arg3: "+ arg3); SimpleStorageService.init(this.arg1, this.arg2, this.arg3); } } From: Jacek Laskowski <ja...@japila.pl> To: Darin McBeath <ddmcbe...@yahoo.com> Cc: user <user@spark.apache.org> Sent: Friday, March 11, 2016 1:40 PM Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor Hi, Could you share the code with foreachPartition? Jacek 11.03.2016 7:33 PM "Darin McBeath" <ddmcbe...@yahoo.com> napisał(a): > >I can verify this by looking at the log file for the workers. > >Since I output logging statements in the object called by the >foreachPartition, I can see the statements being logged. Oddly, these output >statements only occur in one executor (and not the other). It occurs 16 times >in this executor since there are 16 partitions. This seems odd as there are >only 8 cores on the executor and the other executor doesn't appear to be >called at all in the foreachPartition call. But, when I go to do a map >function on this same RDD then things start blowing up on the other executor >as it starts doing work for some partitions (although, it would appear that >all partitions were only initialized on the other executor). The executor that >was used in the foreachPartition call works fine and doesn't experience issue. > But, because the other executor is failing on every request the job dies. > >Darin. > > > >From: Jacek Laskowski <ja...@japila.pl> >To: Darin McBeath <ddmcbe...@yahoo.com> >Cc: user <user@spark.apache.org> >Sent: Friday, March 11, 2016 1:24 PM >Subject: Re: spark 1.6 foreachPartition only appears to be running on one >executor > > > >Hi, >How do you check which executor is used? Can you include a screenshot of the >master's webUI with workers? >Jacek >11.03.2016 6:57 PM "Darin McBeath" <ddmcbe...@yahoo.com.invalid> napisał(a): > >I've run into a situation where it would appear that foreachPartition is only >running on one of my executors. >> >>I have a small cluster (2 executors with 8 cores each). >> >>When I run a job with a small file (with 16 partitions) I can see that the 16 >>partitions are initialized but they all appear to be initialized on only one >>executor. All of the work then runs on this one executor (even though the >>number of partitions is 16). This seems odd, but at least it works. Not sure >>why the other executor was not used. >> >>However, when I run a larger file (once again with 16 partitions) I can see >>that the 16 partitions are initialized once again (but all on the same >>executor). But, this time subsequent work is now spread across the 2 >>executors. This of course results in problems because the other executor was >>not initialized as all of the partitions were only initialized on the other >>executor. >> >>Does anyone have any suggestions for where I might want to investigate? Has >>anyone else seen something like this before? Any thoughts/insights wou
Re: spark 1.6 foreachPartition only appears to be running on one executor
I can verify this by looking at the log file for the workers. Since I output logging statements in the object called by the foreachPartition, I can see the statements being logged. Oddly, these output statements only occur in one executor (and not the other). It occurs 16 times in this executor since there are 16 partitions. This seems odd as there are only 8 cores on the executor and the other executor doesn't appear to be called at all in the foreachPartition call. But, when I go to do a map function on this same RDD then things start blowing up on the other executor as it starts doing work for some partitions (although, it would appear that all partitions were only initialized on the other executor). The executor that was used in the foreachPartition call works fine and doesn't experience issue. But, because the other executor is failing on every request the job dies. Darin. From: Jacek Laskowski <ja...@japila.pl> To: Darin McBeath <ddmcbe...@yahoo.com> Cc: user <user@spark.apache.org> Sent: Friday, March 11, 2016 1:24 PM Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor Hi, How do you check which executor is used? Can you include a screenshot of the master's webUI with workers? Jacek 11.03.2016 6:57 PM "Darin McBeath" <ddmcbe...@yahoo.com.invalid> napisał(a): I've run into a situation where it would appear that foreachPartition is only running on one of my executors. > >I have a small cluster (2 executors with 8 cores each). > >When I run a job with a small file (with 16 partitions) I can see that the 16 >partitions are initialized but they all appear to be initialized on only one >executor. All of the work then runs on this one executor (even though the >number of partitions is 16). This seems odd, but at least it works. Not sure >why the other executor was not used. > >However, when I run a larger file (once again with 16 partitions) I can see >that the 16 partitions are initialized once again (but all on the same >executor). But, this time subsequent work is now spread across the 2 >executors. This of course results in problems because the other executor was >not initialized as all of the partitions were only initialized on the other >executor. > >Does anyone have any suggestions for where I might want to investigate? Has >anyone else seen something like this before? Any thoughts/insights would be >appreciated. I'm using the Stand Alone Cluster manager, cluster started with >the spark ec2 scripts and submitting my job using spark-submit. > >Thanks. > >Darin. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.6 foreachPartition only appears to be running on one executor
I've run into a situation where it would appear that foreachPartition is only running on one of my executors. I have a small cluster (2 executors with 8 cores each). When I run a job with a small file (with 16 partitions) I can see that the 16 partitions are initialized but they all appear to be initialized on only one executor. All of the work then runs on this one executor (even though the number of partitions is 16). This seems odd, but at least it works. Not sure why the other executor was not used. However, when I run a larger file (once again with 16 partitions) I can see that the 16 partitions are initialized once again (but all on the same executor). But, this time subsequent work is now spread across the 2 executors. This of course results in problems because the other executor was not initialized as all of the partitions were only initialized on the other executor. Does anyone have any suggestions for where I might want to investigate? Has anyone else seen something like this before? Any thoughts/insights would be appreciated. I'm using the Stand Alone Cluster manager, cluster started with the spark ec2 scripts and submitting my job using spark-submit. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practice for retrieving over 1 million files from S3
I'm looking for some suggestions based on other's experiences. I currently have a job that I need to run periodically where I need to read on the order of 1+ million files from an S3 bucket. It is not the entire bucket (nor does it match a pattern). Instead, I have a list of random keys that are 'names' for the files in this S3 bucket. The bucket itself will contain upwards of 60M or more files. My current approach has been to get my list of keys, partition on the key, and then map this to an underlying class that uses the most recent AWS SDK to retrieve the file from S3 using this key, which then returns the file. So, in the end, I have an RDD. This works, but I really wonder if this is the best way. I suspect there might be a better/faster way. One thing I've been considering is passing all of the keys (using s3n: urls) to sc.textFile or sc.wholeTextFiles(since some of my files can have embedded newlines). But, I wonder how either of these would behave if I passed literally a million (or more) 'filenames'. Before I spend time exploring, I wanted to seek some input. Any thoughts would be appreciated. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.6 and Application History not working correctly
I tried using Spark 1.6 in a stand-alone cluster this morning. I submitted 2 jobs (and they both executed fine). In fact, they are the exact same jobs with just some different parameters. I was able to view the application history for the first job. However, when I tried to view the second job, I get the following error message. Application history not found (app-20160113140054-0001) No event logs found for application SparkSync Application in file:///root/spark/applicationHistory. Did you specify the correct logging directory? Everything works fine with Spark 1.5. I'm able to view the application history for both jobs. Has anyone else noticed this issue? Any suggestions? Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.6 and Application History not working correctly
Thanks. I already set the following in spark-defaults.conf so I don't think that is going to fix my problem. spark.eventLog.dir file:///root/spark/applicationHistory spark.eventLog.enabled true I suspect my problem must be something else. Darin. From: Don Drake <dondr...@gmail.com> To: Darin McBeath <ddmcbe...@yahoo.com> Cc: User <user@spark.apache.org> Sent: Wednesday, January 13, 2016 10:10 AM Subject: Re: Spark 1.6 and Application History not working correctly I noticed a similar problem going from 1.5.x to 1.6.0 on YARN. I resolved it be setting the following command-line parameters: spark.eventLog.enabled=true spark.eventLog.dir= -Don On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath <ddmcbe...@yahoo.com.invalid> wrote: I tried using Spark 1.6 in a stand-alone cluster this morning. > >I submitted 2 jobs (and they both executed fine). In fact, they are the exact >same jobs with just some different parameters. > >I was able to view the application history for the first job. > >However, when I tried to view the second job, I get the following error >message. > >Application history not found (app-20160113140054-0001) >No event logs found for application SparkSync Application in >file:///root/spark/applicationHistory. Did you specify the correct logging >directory? > > >Everything works fine with Spark 1.5. I'm able to view the application >history for both jobs. > >Has anyone else noticed this issue? Any suggestions? > >Thanks. > >Darin. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > > -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ https://twitter.com/dondrake 800-733-2143 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Turning off DTD Validation using XML Utils package - Spark
ok, a new capability has been added to spark-xml-utils (1.3.0) to address this request. Essentially, the capability to specify 'processor' features has been added (through a new getInstance function). Here is a list of the features that can be set (http://www.saxonica.com/html/documentation/javadoc/net/sf/saxon/lib/FeatureKeys.html). Since we are leveraging the s9apiProcessor under the covers, features relevant to that are the only ones that would make sense to use. To address your request of completely ignoring the Doctype declaration in the xml, you would need to do the following: import net.sf.saxon.lib.FeatureKeys; HashMap<String,Object> featureMap = new HashMap<String,Object>(); featureMap.put(FeatureKeys.ENTITY_RESOLVER_CLASS, "com.somepackage.IgnoreDoctype"); // The first parameter is the xpath expression // The second parameter is the hashmap for the namespace mappings (in this case there are none) // The third parameter is the hashmap for the processor features XPathProcessor proc = XPathProcessor.getInstance("/books/book",null,featureMap); The following evaluation should now work ok. proc.evaluateString("Some BookSome Author200529.99")); } catch (XPathException e) You then would define the following class (and make sure it is included in your application) package com.somepackage; import java.io.ByteArrayInputStream; import org.xml.sax.EntityResolver; import org.xml.sax.InputSource; import org.xml.sax.SAXException; public class IgnoreDoctype implements EntityResolver { public InputSource resolveEntity(java.lang.String publicId, java.lang.String systemId) throws SAXException, java.io.IOException { // Ignore everything return new InputSource(new ByteArrayInputStream("".getBytes())); } } Lastly, you will need to include the saxon9he jar file (open source version). This would work for XPath, XQuery, and XSLT. Hope this helps. When I get a chance, I will update the spark-xml-utils github site with details on the new getInstance function and some more information on the various features. Darin. From: Darin McBeath <ddmcbe...@yahoo.com.INVALID> To: "user@spark.apache.org" <user@spark.apache.org> Sent: Tuesday, December 1, 2015 11:51 AM Subject: Re: Turning off DTD Validation using XML Utils package - Spark The problem isn't really with DTD validation (by default validation is disabled). The underlying problem is that the DTD can't be found (which is indicated in your stack trace below). The underlying parser will try and retrieve the DTD (regardless of validation) because things such as entities could be expressed in the DTD. I will explore providing access to some of the underlying 'processor' configurations. For example, you could provide your own EntityResolver class that could either completely ignore the Doctype declaration (return a 'dummy' DTD that is completely empty) or you could have it find 'local' versions (on the workers or in S3 and then cache them locally for performance). I will post an update when the code has been adjusted. Darin. - Original Message - From: Shivalik <shivalik.malho...@outlook.com> To: user@spark.apache.org Sent: Tuesday, December 1, 2015 8:15 AM Subject: Turning off DTD Validation using XML Utils package - Spark Hi Team, I've been using XML Utils library (http://spark-packages.org/package/elsevierlabs-os/spark-xml-utils) to parse XML using XPath in a spark job. One problem I am facing is with the DTDs. My XML file, has a doctype tag included in it. I want to turn off DTD validation using this library since I don't have access to DTD file. Has someone faced this problem before. Please help. The exception I am getting it is as below: stage 0.0 (TID 0, localhost): com.elsevier.spark_xml_utils.xpath.XPathException: I/O error reported by XML parser processing null: /filename.dtd (No such file or directory) at com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluate(XPathProcessor.java:301) at com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluateString(XPathProcessor.java:219) at com.thomsonreuters.xmlutils.XMLParser.lambda$0(XMLParser.java:31) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Turning-off-DTD-Validation-using-XML-Utils-package-Spark-tp25534.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e
Re: Turning off DTD Validation using XML Utils package - Spark
The problem isn't really with DTD validation (by default validation is disabled). The underlying problem is that the DTD can't be found (which is indicated in your stack trace below). The underlying parser will try and retrieve the DTD (regardless of validation) because things such as entities could be expressed in the DTD. I will explore providing access to some of the underlying 'processor' configurations. For example, you could provide your own EntityResolver class that could either completely ignore the Doctype declaration (return a 'dummy' DTD that is completely empty) or you could have it find 'local' versions (on the workers or in S3 and then cache them locally for performance). I will post an update when the code has been adjusted. Darin. - Original Message - From: ShivalikTo: user@spark.apache.org Sent: Tuesday, December 1, 2015 8:15 AM Subject: Turning off DTD Validation using XML Utils package - Spark Hi Team, I've been using XML Utils library (http://spark-packages.org/package/elsevierlabs-os/spark-xml-utils) to parse XML using XPath in a spark job. One problem I am facing is with the DTDs. My XML file, has a doctype tag included in it. I want to turn off DTD validation using this library since I don't have access to DTD file. Has someone faced this problem before. Please help. The exception I am getting it is as below: stage 0.0 (TID 0, localhost): com.elsevier.spark_xml_utils.xpath.XPathException: I/O error reported by XML parser processing null: /filename.dtd (No such file or directory) at com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluate(XPathProcessor.java:301) at com.elsevier.spark_xml_utils.xpath.XPathProcessor.evaluateString(XPathProcessor.java:219) at com.thomsonreuters.xmlutils.XMLParser.lambda$0(XMLParser.java:31) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Turning-off-DTD-Validation-using-XML-Utils-package-Spark-tp25534.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading xml in java using spark
Another option might be to leverage spark-xml-utils (https://github.com/dmcbeath/spark-xml-utils) This is a collection of xml utilities that I've recently revamped that make it relatively easy to use xpath, xslt, or xquery within the context of a Spark application (or at least I think so). My previous attempt was not overly friendly, but as I've learned more about Spark (and needed easier to use xml utilities) I've hopefully made this easier to use and understand. I hope others find it useful. Back to your problem. Assuming you have a bunch of xml records in an RDD, you should be able to do something like the following to count the number of elements for that type. In the example below, I'm counting the number of references in documents. The xmlKeyPair is an RDD of type (String,String) where the first item is the 'key' and the second item is the xml record. The xpath expression identifies the 'reference' element I want to count. import com.elsevier.spark_xml_utils.xpath.XPathProcessor import scala.collection.JavaConverters._ import java.util.HashMap xmlKeyPair.mapPartitions(recsIter => { val xpath = "count(/xocs:doc/xocs:meta/xocs:references/xocs:ref-info)" val namespaces = new HashMap[String,String](Map( "xocs" -> "http://www.elsevier.com/xml/xocs/dtd; ).asJava) val proc = XPathProcessor.getInstance(xpath,namespaces) recsIter.map(rec => proc.evaluateString(rec._2).toInt) }).sum There is more documentation on the spark-xml-utils github site. Let me know if the documentation is not clear or if you have any questions. Darin. From: Rick HillegasTo: Sonal Goyal Cc: rakesh sharma ; user@spark.apache.org Sent: Monday, August 31, 2015 10:51 AM Subject: Re: Reading xml in java using spark Hi Rakesh, You might also take a look at the Derby code. org.apache.derby.vti.XmlVTI provides a number of static methods for turning an XML resource into a JDBC ResultSet. Thanks, -Rick On 8/31/15 4:44 AM, Sonal Goyal wrote: I think the mahout project had an xmlinoutformat which you can leverage. >On Aug 31, 2015 5:10 PM, "rakesh sharma" wrote: > >I want to parse an xml file in spark >>But as far as example is concerned it reads it as text file. The maping to >>xml will be a tedious job. >>How can I find the number of elements of a particular type using that. Any >>help in java/scala code is also welcome >> >> >>thanks >>rakesh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Please add the Cincinnati spark meetup to the list of meet ups
http://www.meetup.com/Cincinnati-Apache-Spark-Meetup/ Thanks. Darin.
Running into several problems with Data Frames
I decided to play around with DataFrames this morning but I'm running into quite a few issues. I'm assuming that I must be doing something wrong so would appreciate some advice. First, I create my Data Frame. import sqlContext.implicits._ case class Entity(InternalId: Long, EntityId: Long, EntityType: String, CustomerId: String, EntityURI: String, NumDocs: Long) val entities = sc.textFile(s3n://darin/Entities.csv) val entitiesArr = entities.map(v = v.split('|')) val dfEntity = entitiesArr.map(arr = Entity(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4), arr(5).toLong)).toDF() Second, I verify the schema. dfEntity.printSchema root |-- InternalId: long (nullable = false) |-- EntityId: long (nullable = false) |-- EntityType: string (nullable = true) |-- CustomerId: string (nullable = true) |-- EntityURI: string (nullable = true) |-- NumDocs: long (nullable = false) Third, I verify I can select a column. dfEntity.select(InternalId).limit(10).show() InternalId 1 2 3 4 5 6 7 8 9 10 But, things then start to break down. Let's assume I want to filter so I only have records where the InternalId is 5. dfEntity.filter(InternalId 5L).count() But, this gives me the following error message. Doesn't the schema above indicate the InternalId column should be of type Long? console:42: error: type mismatch; found : Long(5L) required: String dfEntity.filter(InternalId 5L).count() I then try the following dfEntity.filter(dfEntity(InternalId) 5L).count() Now, this gives me the following error instead. org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 153.0 failed 4 times, most recent failure: Lost task 13.3 in stage 153.0 (TID 1636, ip-10-0-200-6.ec2.internal): java.lang.ArrayIndexOutOfBoundsException I'm using Apache Spark 1.3. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
repartitionAndSortWithinPartitions and mapPartitions and sort order
I am using repartitionAndSortWithinPartitions to partition my content and then sort within each partition. I've also created a custom partitioner that I use with repartitionAndSortWithinPartitions. I created a custom partitioner as my key consist of something like 'groupid|timestamp' and I only want to partition on the group id but I want to sort the records on each partition using the entire key (groupid and the timestamp). My question is when I use mapPartitions (to process the records in each partition) is whether the order in each partition will be guaranteed (from the sort) as I iterate through the records in each partition. As I iterate, while processing the current record I need to look at the previous record and the next record in the partition and I need to make sure the records would be processed in the sorted order. I tend to think so, but wanted to confirm. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question about the spark assembly deployed to the cluster with the ec2 scripts
I've downloaded spark 1.2.0 to my laptop. In the lib directory, it includes spark-assembly-1.2.0-hadoop2.4.0.jar When I spin up a cluster using the ec2 scripts with 1.2.0 (and set --hadoop-major-version=2) I notice that in the lib directory for the master/slaves the assembly is for hadoop2.0.0 (and I think Cloudera). Is there a way that I can force the install of the same assembly to the cluster that comes with the 1.2 download of spark? Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about Spark best practice when counting records.
Thanks for you quick reply. Yes, that would be fine. I would rather wait/use the optimal approach as opposed to hacking some one-off solution. Darin. From: Kostas Sakellis kos...@cloudera.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Friday, February 27, 2015 12:19 PM Subject: Re: Question about Spark best practice when counting records. Hey Darin, Record count metrics are coming in Spark 1.3. Can you wait until it is released? Or do you need a solution in older versions of spark. Kostas On Friday, February 27, 2015, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I have a fairly large Spark job where I'm essentially creating quite a few RDDs, do several types of joins using these RDDS resulting in a final RDD which I write back to S3. Along the way, I would like to capture record counts for some of these RDDs. My initial approach was to use the count action on some of these intermediate RDDS (and cache them since the count would force the materialization of the RDD and the RDD would be needed again later). This seemed to work 'ok' when my RDDs were fairly small/modest but as they grew in size I started to experience problems. After watching a recent very good screencast on performance, this doesn't seem the correct approach as I believe I'm really breaking (or hindering) the pipelining concept in Spark. If I remove all of my counts, I'm only left with the one job/action (save as Hadoop file at the end). Spark then seems to run smoother (and quite a bit faster) and I really don't need (or want) to even cache any of my intermediate RDDs. So, the approach I've been kicking around is to use accumulators instead. I was already using them to count 'bad' records but why not 'good' records as well? I realize that if I lose a partition that I might over count, but perhaps that is an acceptable trade-off. I'm guessing that others have ran into this before so I would like to learn from the experience of others and how they have addressed this. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question about Spark best practice when counting records.
I have a fairly large Spark job where I'm essentially creating quite a few RDDs, do several types of joins using these RDDS resulting in a final RDD which I write back to S3. Along the way, I would like to capture record counts for some of these RDDs. My initial approach was to use the count action on some of these intermediate RDDS (and cache them since the count would force the materialization of the RDD and the RDD would be needed again later). This seemed to work 'ok' when my RDDs were fairly small/modest but as they grew in size I started to experience problems. After watching a recent very good screencast on performance, this doesn't seem the correct approach as I believe I'm really breaking (or hindering) the pipelining concept in Spark. If I remove all of my counts, I'm only left with the one job/action (save as Hadoop file at the end). Spark then seems to run smoother (and quite a bit faster) and I really don't need (or want) to even cache any of my intermediate RDDs. So, the approach I've been kicking around is to use accumulators instead. I was already using them to count 'bad' records but why not 'good' records as well? I realize that if I lose a partition that I might over count, but perhaps that is an acceptable trade-off. I'm guessing that others have ran into this before so I would like to learn from the experience of others and how they have addressed this. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
job keeps failing with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8 r3.8xlarge machines but limit the job to only 128 cores. I have also tried other things such as setting 4 workers per r3.8xlarge and 67gb each but this made no difference. The job frequently fails at the end in this step (saveasHadoopFile). It will sometimes work. finalNewBaselinePairRDD is hashPartitioned with 1024 partitions and a total size around 1TB. There are about 13.5M records in finalNewBaselinePairRDD. finalNewBaselinePairRDD is String,String JavaPairRDDText, Text finalBaselineRDDWritable = finalNewBaselinePairRDD.mapToPair(new ConvertToWritableTypes()).persist(StorageLevel.MEMORY_AND_DISK_SER()); // Save to hdfs (gzip) finalBaselineRDDWritable.saveAsHadoopFile(hdfs:///sparksync/, Text.class, Text.class, SequenceFileOutputFormat.class,org.apache.hadoop.io.compress.GzipCodec.class); If anyone has any tips for what I should look into it would be appreciated. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which OutputCommitter to use for S3?
Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e= You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn¹t get any response. It¹d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim
Re: Which OutputCommitter to use for S3?
Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:771) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which OutputCommitter to use for S3?
Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e= You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn¹t get any response. It¹d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa che.org_mod-5Fmbox_spark
Incorrect number of records after left outer join (I think)
Consider the following left outer join potentialDailyModificationsRDD = reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER()); Below are the record counts for the RDDs involved Number of records for reducedDailyPairRDD: 2565206 Number of records for baselinePairRDD: 56102812 Number of records for potentialDailyModificationsRDD: 2570115 Below are the partitioners for the RDDs involved. Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400) Partitioner for potentialDailyModificationsRDD: Some(org.apache.spark.HashPartitioner@400) I realize in the above statement that the .partitionBy is probably not needed as the underlying RDDs used in the left outer join are already hash partitioned. My question is how the resulting RDD (potentialDailyModificationsRDD) can end up with more records than reducedDailyPairRDD. I would think the number of records in potentialDailyModificationsRDD should be 2565206 instead of 2570115. Am I missing something or is this possibly a bug? I'm using Apache Spark 1.2 on a stand-alone cluster on ec2. To get the counts for the records, I'm using the .count() for the RDD. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you get the partitioner for an RDD in Java?
Thanks Imran. That's exactly what I needed to know. Darin. From: Imran Rashid iras...@cloudera.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Tuesday, February 17, 2015 8:35 PM Subject: Re: How do you get the partitioner for an RDD in Java? a JavaRDD is just a wrapper around a normal RDD defined in scala, which is stored in the rdd field. You can access everything that way. The JavaRDD wrappers just provide some interfaces that are a bit easier to work with in Java. If this is at all convincing, here's me demonstrating it inside the spark-shell (yes its scala, but I'm using the java api) scala val jsc = new JavaSparkContext(sc) jsc: org.apache.spark.api.java.JavaSparkContext = org.apache.spark.api.java.JavaSparkContext@7d365529 scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c))) data: org.apache.spark.api.java.JavaRDD[Array[String]] = ParallelCollectionRDD[0] at parallelize at console:15 scala data.rdd.partitioner res0: Option[org.apache.spark.Partitioner] = None On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MapValues and Shuffle Reads
In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MapValues and Shuffle Reads
Thanks Imran. I think you are probably correct. I was a bit surprised that there was no shuffle read in the initial hash partition step. I will adjust the code as you suggest to prove that is the case. I have a slightly different question. If I save an RDD to S3 (or some equivalent) and this RDD was hash partitioned at the time, do I still need to hash partition the RDD again when I read it in? Is there a way that I could prevent all of the shuffling (such as providing a hint)? My parts for the RDD will be gzipped so they would not be splittable). In reality, that's what I would really want to do in the first place. Thanks again for your insights. Darin. From: Imran Rashid iras...@cloudera.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Tuesday, February 17, 2015 3:29 PM Subject: Re: MapValues and Shuffle Reads Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do you get the partitioner for an RDD in Java?
In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problems saving a large RDD (1 TB) to S3 as a sequence file
I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not really sure why. 671 24225 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 672 24226 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 673 24227 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 674 24228 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 675 24229 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 676 24230 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 677 24231 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 678 24232 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 679 24233 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 680 24234 0 RUNNING ANY 1 / 2015/01/23 18:59:17 47 min 681 24235 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 682 24236 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 683 24237 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 684 24238 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 685 24239 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 686 24240 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 687 24241 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 688 24242 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 689 24243 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 690 24244 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 691 24245 0 RUNNING ANY 5 / 2015/01/23 18:59:21 47 min What's odd is that even on the same machine (see below) some tasks are still completing (in less than 5 minutes) while other tasks on the same machine seem to be hung after 46 minutes. Keep in mind all I'm doing is saving the file to S3 so one would think the amount of work per task/partition would be fairly equal. 694 24248 0 SUCCESS ANY 0 / 2015/01/23 18:59:32 4.5 min 0.3 s 326.5 MB 695 24249 0 SUCCESS ANY 0 / 2015/01/23 18:59:32 4.5 min 0.3 s 330.8 MB 696 24250 0 RUNNING ANY 0 / 2015/01/23 18:59:32 46 min 697 24251
Re: Problems saving a large RDD (1 TB) to S3 as a sequence file
Thanks for the ideas Sven. I'm using stand-alone cluster (Spark 1.2). FWIW, I was able to get this running (just now). This is the first time it's worked in probably my last 10 attempts. In addition to limiting the executors to only 50% of the cluster. In the settings below, I additionally added/changed the following. Maybe, I just got lucky (although I think not). Would be good if someone could weigh in and agree that these changes are sensible. I'm also hoping the support for placement groups (targeted for 1.3 in the ec2 scripts) will help the situation. All in all, it takes about 45 minutes to write a 1 TB file back to S3 (as 1024 partitions). SparkConf conf = new SparkConf() .setAppName(SparkSync Application) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.rdd.compress,true) .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.timeout,600)// Increased from 300 .set(spark.akka.threads,16) // Added so that default was increased from 4 to 16 .set(spark.task.maxFailures,64) // Didn't really matter as I had no failures in this run .set(spark.storage.blockManagerSlaveTimeoutMs,30); From: Sven Krasser kras...@gmail.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Friday, January 23, 2015 5:12 PM Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside, another explanation could be S3 throttling you due to volume (and hence causing write requests to fail). You can try to split your file into multiple pieces and store those as S3 objects with different prefixes to make sure they end up in different partitions in S3. See here for details: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. If that works, that'll narrow the cause down. Best, -Sven On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not really sure why. 671 24225 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 672 24226 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 673 24227 0 RUNNING ANY 1 / 2015/01/23 18:59
Confused about shuffle read and shuffle write
I have the following code in a Spark Job. // Get the baseline input file(s) JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(newConvertFromWritableTypes()).partitionBy(newHashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER()); // Use 'substring' to extract epoch values. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(newExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER()); When looking at the STAGE information for my job, I notice the following: To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of input (from S3) and 377GB of shuffle write (presumably because of the hash partitioning). This all makes sense. To construct the baselinePairRDD, it also takes about 7.5 minutes. I thought that was a bit odd. But what I thought was really odd is why there was also 330GB of shuffle read in this stage. I would have thought there should be 0 shuffle read in this stage. What I'm confused about is why there is even any 'shuffle read' when constructing the baselinePairRDD. If anyone could shed some light on this it would be appreciated. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Confused about shuffle read and shuffle write
I have the following code in a Spark Job. // Get the baseline input file(s) JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER()); // Use 'substring' to extract epoch values. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER()); When looking at the STAGE information for my job, I notice the following: To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of input (from S3) and 377GB of shuffle write (presumably because of the hash partitioning). This all makes sense. To construct the baselinePairRDD, it also takes about 7.5 minutes. I thought that was a bit odd. But what I thought was really odd is why there was also 330GB of shuffle read in this stage. I would have thought there should be 0 shuffle read in this stage. What I'm confused about is why there is even any 'shuffle read' when constructing the baselinePairRDD. If anyone could shed some light on this it would be appreciated. Thanks. Darin.
Re: Please help me get started on Apache Spark
Take a look at the O'Reilly Learning Spark (Early Release) book. I've found this very useful. Darin. From: Saurabh Agrawal saurabh.agra...@markit.com To: user@spark.apache.org user@spark.apache.org Sent: Thursday, November 20, 2014 9:04 AM Subject: Please help me get started on Apache Spark !--#yiv9027708365 _filtered #yiv9027708365 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv9027708365 #yiv9027708365 p.yiv9027708365MsoNormal, #yiv9027708365 li.yiv9027708365MsoNormal, #yiv9027708365 div.yiv9027708365MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri, sans-serif;}#yiv9027708365 a:link, #yiv9027708365 span.yiv9027708365MsoHyperlink {color:blue;text-decoration:underline;}#yiv9027708365 a:visited, #yiv9027708365 span.yiv9027708365MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv9027708365 p.yiv9027708365MsoListParagraph, #yiv9027708365 li.yiv9027708365MsoListParagraph, #yiv9027708365 div.yiv9027708365MsoListParagraph {margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri, sans-serif;}#yiv9027708365 span.yiv9027708365EmailStyle17 {font-family:Calibri, sans-serif;color:windowtext;}#yiv9027708365 .yiv9027708365MsoChpDefault {font-family:Calibri, sans-serif;} _filtered #yiv9027708365 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv9027708365 div.yiv9027708365WordSection1 {}#yiv9027708365 _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {} _filtered #yiv9027708365 {}#yiv9027708365 ol {margin-bottom:0in;}#yiv9027708365 ul {margin-bottom:0in;}-- Friends, I am pretty new to Spark as much as to Scala, MLib and the entire Hadoop stack!! It would be so much help if I could be pointed to some good books on Spark and MLib? Further, does MLib support any algorithms for B2B cross sell/ upsell or customer retention (out of the box preferably) that I could run on my Sales force data? I am currently using Collaborative filtering but that’s essentially B2C. Thanks in advance!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Confused why I'm losing workers/executors when writing a large file to S3
For one of my Spark jobs, my workers/executors are dying and leaving the cluster. On the master, I see something like the following in the log file. I'm surprised to see the '60' seconds in the master log below because I explicitly set it to '600' (or so I thought) in my spark job (see below). This is happening at the end of my job when I'm trying to persist a large RDD (probably around 300+GB) back to S3 (in 256 partitions). My cluster consists of 6 r3.8xlarge machines. The job successfully works when I'm outputting 100GB or 200GB. If you have any thoughts/insights, it would be appreciated. Thanks. Darin. Here is where I'm setting the 'timeout' in my spark job. SparkConf conf = new SparkConf().setAppName(SparkSync Application).set(spark.serializer, org.apache.spark.serializer.KryoSerializer).set(spark.rdd.compress,true) .set(spark.core.connection.ack.wait.timeout,600); On the master, I see the following in the log file. 4/11/13 17:20:39 WARN master.Master: Removing worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no heartbeat in 60 seconds14/11/13 17:20:39 INFO master.Master: Removing worker worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on ip-10-35-184-232.ec2.internal:5187714/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2 On a worker, I see something like the following in the log file. 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attemptsjava.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception (java.net.SocketException) caught when processing request: Broken pipe14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception (java.net.SocketException) caught when processing request: Broken pipe14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which exceeds the maximum retry count of 514/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which exceeds the maximum retry count of 514/11/13 17:22:57 WARN util.AkkaUtils: Error sending message in 1 attemptsjava.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
ec2 script and SPARK_LOCAL_DIRS not created
I'm using spark 1.1 and the provided ec2 scripts to start my cluster (r3.8xlarge machines). From the spark-shell, I can verify that the environment variables are set scala System.getenv(SPARK_LOCAL_DIRS)res0: String = /mnt/spark,/mnt2/spark However, when I look on the workers, the directories for /mnt/spark and /mnt2/spark do not exist. Am I missing something? Has anyone else noticed this? A colleague was started a cluster (using the ec2 scripts) but for m3.xlarge machines and both /mnt/spark and /mnt2/spark directories were created. Thanks. Darin.
What should be the number of partitions after a union and a subtractByKey
Assume the following where both updatePairRDD and deletePairRDD are both HashPartitioned. Before the union, each one of these has 512 partitions. The new created updateDeletePairRDD has 1024 partitions. Is this the general/expected behavior for a union (the number of partitions to double)? JavaPairRDDString,String updateDeletePairRDD = updatePairRDD.union(deletePairRDD); Then a similar question for subtractByKey. In the example below, baselinePairRDD is HashPartitioned (with 512 partitions). We know from above that updateDeletePairRDD has 1024 partitions. The newly created workSubtractBaselinePairRDD has 512 partitions. This makes sense because we are only 'subtracting' records from the baselinePairRDD and one wouldn't think the number of partitions would increase. Is this the general/expected behavior for a subractByKey? JavaPairRDDString,String workSubtractBaselinePairRDD = baselinePairRDD.subtractByKey(updateDeletePairRDD);
Question about RDD Union and SubtractByKey
I have the following code where I'm using RDD 'union' and 'subtractByKey' to create a new baseline RDD. All of my RDDs are a key pair with the 'key' a String and the 'value' a String (xml document). // **// Merge the daily deletes/updates/adds with the baseline// ** // Concat the Updates, Deletes into one PairRDDJavaPairRDDString,String updateDeletePairRDD = updatePairRDD.union(deletePairRDD); // Remove the update/delete keys from the baselineJavaPairRDDString,String workSubtractBaselinePairRDD = baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the AddsJavaPairRDDString,String workAddBaselinePairRDD = workSubtractBaselinePairRDD.union(addPairRDD); // Add in the UpdatesJavaPairRDDString,String newBaselinePairRDD = workAddBaselinePairRDD.union(updatePairRDD); When I go to 'count' the newBaselinePairRDD // Output count for new baseline log.info(Number of new baseline records: + newBaselinePairRDD.count()); I'm getting the following exception (the above log.info is SparkSync.java:785). What I find odd is the reference to spark sql. So, I'm curious as to whether under the covers the RDD union and/or subtractByKey are implemented as spark sql. I wouldn't think so but thought I would ask. I'm also suspicious to the reference to the '' and whether that is because of my xml document in the value portion of the key pair. Any insights would be appreciated. If there are thoughts for how to better approach my problem (even debugging), I would be interested in that as well. The updatePairRDD, deletePairRDD, baselinePairRDD, addPairRDD, and updateDeletePairRDD are all 'hashPartitioned'. It's also a bit difficult to trace things because my application is a 'java' application and the stack references a lot of scala and very few references to my application other than one (SparkSync.java:785). My application is using Spark SQL for some other tasks so perhaps an RDD (part) is being re-calculated and is resulting in this error. But, based on other logging statements throughout my application, I don't believe this is the case. Thanks. Darin. 14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at SparkSync.java:78514/11/10 22:35:27 WARN scheduler.TaskSetManager: Lost task 0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal): com.fasterxml.jackson.core.JsonParseException: Unexpected character ('' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@e8f759e; line: 1, column: 2] com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415) com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54)
Cincinnati, OH Meetup for Apache Spark
Let me know if you are interested in participating in a meet up in Cincinnati, OH to discuss Apache Spark. We currently have 4-5 different companies expressing interest but would like a few more. Darin.
XML Utilities for Apache Spark
I developed the spark-xml-utils library because we have a large amount of XML in big datasets and I felt this data could be better served by providing some helpful xml utilities. This includes the ability to filter documents based on an xpath/xquery expression, return specific nodes for an xpath/xquery expression, or transform documents using an xquery or an xslt stylesheet. By providing some basic wrappers to Saxon-HE, the spark-xml-utils library exposes some basic xpath, xslt, and xquery functionality that can readily be leveraged by any Spark application (including the spark-shell). We want to share this library with the community and are making it available under the Apache 2.0 license. For point of reference, I was able to parse and apply a fairly complex xpath expression against 2 million documents (130GB total and 75KB/doc average) in less than 3 minute on an AWS cluster (at spot price) costing less than $1/hr. When I have a chance, I will blog/write about some of my other investigations when using spark-xml-utils. More about the project is available on github(https://github.com/elsevierlabs/spark-xml-utils). There are examples for usage from the spark-shell as well as from a Java application. Feel free to use, contribute, and/or let us know how this library can be improved. Let me know if you have any questions. Darin.
Spark SQL and confused about number of partitions/tasks to do a simple join.
I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
Re: Spark SQL and confused about number of partitions/tasks to do a simple join.
ok. after reading some documentation, it would appear the issue is the default number of partitions for a join (200). After doing something like the following, I was able to change the value. From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: User user@spark.apache.org Sent: Wednesday, October 29, 2014 1:55 PM Subject: Spark SQL and confused about number of partitions/tasks to do a simple join. I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
Re: Spark SQL and confused about number of partitions/tasks to do a simple join.
Sorry, hit the send key a bitt too early. Anyway, this is the code I set. sqlContext.sql(set spark.sql.shuffle.partitions=10); From: Darin McBeath ddmcbe...@yahoo.com To: Darin McBeath ddmcbe...@yahoo.com; User user@spark.apache.org Sent: Wednesday, October 29, 2014 2:47 PM Subject: Re: Spark SQL and confused about number of partitions/tasks to do a simple join. ok. after reading some documentation, it would appear the issue is the default number of partitions for a join (200). After doing something like the following, I was able to change the value. From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: User user@spark.apache.org Sent: Wednesday, October 29, 2014 1:55 PM Subject: Spark SQL and confused about number of partitions/tasks to do a simple join. I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline. I have a SchemaRDD with 11 records in 1 partition. We'll call this daily. After a fairly basic join of these two tables JavaSchemaRDD results = sqlContext.sql(SELECT id, action, daily.epoch, daily.version FROM baseline, daily WHERE key=id AND action='u' AND daily.epoch baseline.epoch).cache(); I get a new SchemaRDD results with only 6 records (and the RDD has 200 partitions). When the job runs, I can see that 200 tasks were used to do this join. Does this make sense? I'm currently not doing anything special along the lines of partitioning (such as hash). Even if 200 tasks would have been required, since the result is only 6 (shouldn't some of these empty partitions been 'deleted'). I'm using Apache Spark 1.1 and I'm running this in local mode (localhost[1]). Any insight would be appreciated. Thanks. Darin.
what's the best way to initialize an executor?
I have some code that I only need to be executed once per executor in my spark application. My current approach is to do something like the following: scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats, Namespaces/NamespaceContext)) So, If I understand correctly, the XPathProcessor.init will be called once per partition. Since I have 48 partitions for this RDD and 2 million documents, that seems acceptable. The downside is that I likely will have fewer executors than 48 (each executor will handle more than 1 partition) so the executor would be called more than once with XPathProcessor.init. I have code in place to make sure this is not an issue. But, I was wondering if there is a better way to accomplish something like this. Thanks. Darin.
confused about memory usage in spark
I have a PairRDD of type String,String which I persist to S3 (using the following code). JavaPairRDDText, Text aRDDWritable = aRDD.mapToPair(new ConvertToWritableTypes());aRDDWritable.saveAsHadoopFile(outputFile, Text.class, Text.class, SequenceFileOutputFormat.class); class ConvertToWritableTypes implements PairFunctionTuple2String, String, Text, Text { public Tuple2Text, Text call(Tuple2String, String record) { return new Tuple2(new Text(record._1), new Text(record._2)); } } When I look at the S3 reported size for say one of the parts (part-0) it indicates the size is 156MB. I then bring up a spark-shell and load this part-0 and cache it. scala val keyPair = sc.sequenceFile[String,String](s3n://somebucket/part-0).cache() After execution an action for the above RDD to force the cache, I look at the storage (using the Application UI) and it show that I'm using 297MB for this RDD (when it was only 156MB in S3). I get that there could be some differences between the serialized storage format and what is then used in memory, but I'm curious as to whether I'm missing something and/or should be doing things differently. Thanks. Darin.
Disabling log4j in Spark-Shell on ec2 stopped working on Wednesday (Oct 8)
For weeks, I've been using the following trick to successfully disable log4j in the spark-shell when running a cluster on ec2 that was started by the Spark provided ec2 scripts. cp ./conf/log4j.properties.template ./conf/log4j.properties I then change log4j.rootCategory=INFO to log4j.rootCategory=WARN. This all stopped working on Wednesday when I could no longer successfully start a cluster on ec2 (using the Spark provided ec2 scripts). I noticed the resolution to this problem was a script referenced by the ec2 scripts had been changed (and that this referenced script has since been reverted). I raise this as I don't know if this is a symptom of my problem and that it's interesting the problems started happening at the same time. When I now start up the cluster on ec2 and subsequently start the spark-shell I can no longer disable the log4j messages using the above trick. I'm using Apache Spark 1.1.0. What's interesting is that I can start the cluster locally on my laptop (using Spark 1.1.0) and the above trick for disabling log4j in the spark-shell works. So, the issue appears to be related to ec2 and potentially something referenced by the Spark provided ec2 startup script. But, that is purely a guess on my part. I'm wondering if anyone else has noticed this issue and if so has a workaround. Thanks. Darin.
How to pass env variables from master to executors within spark-shell
Can't seem to figure this out. I've tried several different approaches without success. For example, I've tried setting spark.executor.extraJavaOptions in the spark-default.conf (prior to starting the spark-shell) but this seems to have no effect. Outside of spark-shell (within a java application I wrote), I successfully do the following: // Set environment variables for the executors conf.setExecutorEnv(AWS_ACCESS_KEY_ID, System.getenv(AWS_ACCESS_KEY_ID)); conf.setExecutorEnv(AWS_SECRET_ACCESS_KEY, System.getenv(AWS_SECRET_ACCESS_KEY)); But, because my SparkContext already exists within spark-shell, this really isn't an option (unless I'm missing something). Thanks. Darin.
Issues with S3 client library and Apache Spark
I've seen a couple of issues posted about this, but I never saw a resolution. When I'm using Spark 1.0.2 (and the spark-submit script to submit my jobs) and AWS SDK 1.8.7, I get the stack trace below. However, if I drop back to AWS SDK 1.3.26 (or anything from the AWS SDK 1.4.* family) then everything works fine. It would appear that after AWS SDK 1.4, there became a dependency on HTTP Client 4.2 (instead of 4.1). I would like to use the more recent versions of the AWS SDK (and not use something nearly 2 years old) so I'm curious whether anyone has figured out a workaround to this problem. Thanks. Darin. java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:97) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:181) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:408) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:390) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:374) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:313) at com.elsevier.s3.SimpleStorageService.clinit(SimpleStorageService.java:27) at com.elsevier.spark.XMLKeyPair.call(SDKeyMapKeyPairRDD.java:75) at com.elsevier.spark.XMLKeyPair.call(SDKeyMapKeyPairRDD.java:65) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:779) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:769) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680)
Should the memory of worker nodes be constrained to the size of the master node?
I started up a cluster on EC2 (using the provided scripts) and specified a different instance type for the master and the the worker nodes. The cluster started fine, but when I looked at the cluster (via port 8080), it showed that the amount of memory available to the worker nodes did not match the instance type I had specified. Instead, the amount of memory for the worker nodes matched the master node. I did verify that the correct instance types had been started for the master and worker nodes. Curious as to whether this is expected behavior or if this might be a bug? Thanks. Darin.
Is there any interest in handling XML within Spark ?
I've been playing around with Spark off and on for the past month and have developed some XML helper utilities that enable me to filter an XML dataset as well as transform an XML dataset (we have a lot of XML content). I'm posting this email to see if there would be any interest in this effort (as I would be happy to place the code in a public git repo) and/or to see if there is already something in place that already provides this capability (so I'm not wasting my time). Under the covers, I'm leverage Saxon-HE. I'll first discuss the 'filtering' aspect. Assuming you have already created a PairRDD (with the key being the identifier for the XML document, and the value being the actual XML document), you could easily do the following from the spark-shell to filter this Pair RDD based on an arbitrary XPath expression. ## Start the spark-shell (and copy the jar file to executors) root@ip-10-233-73-204 spark]$ ./bin/spark-shell --jars lib/uber-SparkUtils-0.1.jar ## Bring in the sequence file (2 million records) scala val xmlKeyPair = sc.sequenceFile[String,String](s3n://darin/xml/part*).cache() ## Test values against an xpath expression (need to import the the class from my jar) scala import com.darin.xml.XPathEvaluation scala val resultsRDD = xmlKeyPair.filter{case(k,v) = XPathEvaluation.evaluateString(v, /doc/meta[year='2010'])} ## Save the results as a hadoop sequence file scala resultsRDD.saveAsSequenceFile(s3n:/darin/xml/results) ## Do more xpath expressions to create more filtered datasets, etc. In my case, the initial PairRDD is about 130GB. With 2 million documents, this implies an average of around 65KB per document. On a small 3 node AWS cluster (m3.2xlarge) the above will execute in around 10 minutes. I currently use spot instances (.08/hr each) so this is very economical. More complex XPath expressions could be used. Assume a sample record structure of the following person gender=male age32/age hobbies hobbytennis/hobby hobbygolf/hobby hobbyprogramming/hobby /hobbies name given-nameDarin/given-name surnameMcBeath/surname /name address street8000 Park Lake Dr/street cityMason/city stateOhio/state /address /person The following XPath expressions could be used. // Exact match where the surname equals 'McBeath' exists(/person/name[surname='McBeath']) // Exact match where the person gender attribute equals 'male' exists(/person[@gender='male']) // Where the person age is between 30 and 40 exists(/person[(xs:integer(age) = 30) and (xs:integer(age) = 40)]) // Exact match (after lower-case conversion) where the surname equals 'mcbeath' exists(/person/name[lower-case(string-join(surname/text(),' '))='mcbeath']) // Exact match (after lower-case conversion) where within a name a surname equals 'mcbeath' and given-name equals 'darin' exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and lower-case(string-join(given-name/text(),' '))='darin']]) // Exact match (after lower-case conversion) where within a name a surname equals 'mcbeath' and given-name equals 'darin' or 'darby' exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and lower-case(string-join(given-name/text(),' '))=('darin','darby')]]) // Search/Token match (after lower-case conversion) where an immediate text node(s) of street contains the token 'lake' exists(/person/address[tokenize(lower-case(string-join(street/text(),' ')),'\\W+') = 'lake']) // Search/Token match (after lower-case conversion) where any text node descendant of person contains the token 'lake' exists(/person[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') = 'lake']))); // Search/Token 'wildcard' match (after lower-case conversion) where an immediate text node(s) of street contains the token matching the wildcard expression 'pa*' exists(/person/address[(for $i in tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return matches($i,'pa.+')) = true()]) // Search/Token 'wildcard' match (after lower-case conversion) where an immediate text node(s) of street contains the token matching the wildcard expression 'pa?k' exists(/person/address[(for $i in tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return matches($i,'pa.?k')) = true()]) // Exact match were first hobby is 'tennis' exists(/person/hobbies/hobby[position() = 1 and ./text() = 'tennis']) // Exact match were first or second hobby is 'tennis' exists(/person/hobbies/hobby[position() = 2 and ./text() = 'tennis']) // Exact match where the state does not equal 'Ohio' not(exists(//state[.='Ohio'])) // Search/Token match (after lower-case conversion) where any text node descendant of person contains the phrase 'park lake' exists(/person[matches(string-join(tokenize(lower-case(string-join(.//text(),' ')),'\\W+'),' '), 'park lake')]))); This is very much work in progress. But, I'm curious as to whether there is interest in the Spark community for something like this. I have also done something
Re: Number of partitions and Number of concurrent tasks
Ok, I set the number of spark worker instances to 2 (below is my startup command). But, this essentially had the effect of increasing my number of workers from 3 to 6 (which was good) but it also reduced my number of cores per worker from 8 to 4 (which was not so good). In the end, I would still only be able to concurrently process 24 partitions in parallel. I'm starting a stand-alone cluster using the spark provided ec2 scripts . I tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py but this had no effect. So, it's not clear if I could even set the SPARK_WORKER_CORES with the ec2 scripts. Anyway, not sure if there is anything else I can try but at least wanted to document what I did try and the net effect. I'm open to any suggestions/advice. ./spark-ec2 -k key -i key.pem --hadoop-major-version=2 launch -s 3 -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2 my-cluster From: Daniel Siegmann daniel.siegm...@velos.io To: Darin McBeath ddmcbe...@yahoo.com Cc: Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org user@spark.apache.org Sent: Thursday, July 31, 2014 10:04 AM Subject: Re: Number of partitions and Number of concurrent tasks I haven't configured this myself. I'd start with setting SPARK_WORKER_CORES to a higher value, since that's a bit simpler than adding more workers. This defaults to all available cores according to the documentation, so I'm not sure if you can actually set it higher. If not, you can get around this by adding more worker instances; I believe simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient. I don't think you have to set the cores if you have more workers - it will default to 8 cores per worker (in your case). But maybe 16 cores per node will be too many. You'll have to test. Keep in mind that more workers means more memory and such too, so you may need to tweak some other settings downward in this case. On a side note: I've read some people found performance was better when they had more workers with less memory each, instead of a single worker with tons of memory, because it cut down on garbage collection time. But I can't speak to that myself. In any case, if you increase the number of cores available in your cluster (whether per worker, or adding more workers per node, or of course adding more nodes) you should see more tasks running concurrently. Whether this will actually be faster probably depends mainly on whether the CPUs in your nodes were really being fully utilized with the current number of cores. On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Thanks. So to make sure I understand. Since I'm using a 'stand-alone' cluster, I would set SPARK_WORKER_INSTANCES to something like 2 (instead of the default value of 1). Is that correct? But, it also sounds like I need to explicitly set a value for SPARKER_WORKER_CORES (based on what the documentation states). What would I want that value to be based on my configuration below? Or, would I leave that alone? From: Daniel Siegmann daniel.siegm...@velos.io To: user@spark.apache.org; Darin McBeath ddmcbe...@yahoo.com Sent: Wednesday, July 30, 2014 5:58 PM Subject: Re: Number of partitions and Number of concurrent tasks This is correct behavior. Each core can execute exactly one task at a time, with each task corresponding to a partition. If your cluster only has 24 cores, you can only run at most 24 tasks at once. You could run multiple workers per node to get more executors. That would give you more cores in the cluster. But however many cores you have, each core will run only one task at a time. On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote: I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1. I have an RDDString which I've repartitioned so it has 100 partitions (hoping to increase the parallelism). When I do a transformation (such as filter) on this RDD, I can't seem to get more than 24 tasks (my total number of cores across the 3 nodes) going at one point in time. By tasks, I mean the number of tasks that appear under the Application UI. I tried explicitly setting the spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently running) and verified this in the Application UI for the running application but this had no effect. Perhaps, this is ignored for a 'filter' and the default is the total number of cores available. I'm fairly new with Spark so maybe I'm just missing or misunderstanding something fundamental. Any help would be appreciated. Thanks. Darin. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.iow: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine
Number of partitions and Number of concurrent tasks
I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1. I have an RDDString which I've repartitioned so it has 100 partitions (hoping to increase the parallelism). When I do a transformation (such as filter) on this RDD, I can't seem to get more than 24 tasks (my total number of cores across the 3 nodes) going at one point in time. By tasks, I mean the number of tasks that appear under the Application UI. I tried explicitly setting the spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently running) and verified this in the Application UI for the running application but this had no effect. Perhaps, this is ignored for a 'filter' and the default is the total number of cores available. I'm fairly new with Spark so maybe I'm just missing or misunderstanding something fundamental. Any help would be appreciated. Thanks. Darin.
Re: Number of partitions and Number of concurrent tasks
Thanks. So to make sure I understand. Since I'm using a 'stand-alone' cluster, I would set SPARK_WORKER_INSTANCES to something like 2 (instead of the default value of 1). Is that correct? But, it also sounds like I need to explicitly set a value for SPARKER_WORKER_CORES (based on what the documentation states). What would I want that value to be based on my configuration below? Or, would I leave that alone? From: Daniel Siegmann daniel.siegm...@velos.io To: user@spark.apache.org; Darin McBeath ddmcbe...@yahoo.com Sent: Wednesday, July 30, 2014 5:58 PM Subject: Re: Number of partitions and Number of concurrent tasks This is correct behavior. Each core can execute exactly one task at a time, with each task corresponding to a partition. If your cluster only has 24 cores, you can only run at most 24 tasks at once. You could run multiple workers per node to get more executors. That would give you more cores in the cluster. But however many cores you have, each core will run only one task at a time. On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote: I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1. I have an RDDString which I've repartitioned so it has 100 partitions (hoping to increase the parallelism). When I do a transformation (such as filter) on this RDD, I can't seem to get more than 24 tasks (my total number of cores across the 3 nodes) going at one point in time. By tasks, I mean the number of tasks that appear under the Application UI. I tried explicitly setting the spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently running) and verified this in the Application UI for the running application but this had no effect. Perhaps, this is ignored for a 'filter' and the default is the total number of cores available. I'm fairly new with Spark so maybe I'm just missing or misunderstanding something fundamental. Any help would be appreciated. Thanks. Darin. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.iow: www.velos.io