Re: json_tuple fails to parse string with emoji

2017-01-26 Thread Andrew Ehrlich
It looks like I'm hitting this bug in jackson-core 2.2.3 which is included
in the version of CDH I'm on:
https://github.com/FasterXML/jackson-core/issues/115

Jackson-core 2.3.0 has the fix.

On Tue, Jan 24, 2017 at 5:14 PM, Andrew Ehrlich <and...@aehrlich.com> wrote:

> On Spark 1.6.0, calling json_tuple() with an emoji character in one of the
> values returns nulls:
>
> Input:
> """
> "myJsonBody": {
>   "field1": ""
> }
> """
>
> Query:
> """
> ...
> LATERAL VIEW JSON_TUPLE(e.myJsonBody,'field1') k AS field1,
> ...
>
> """
>
> This looks like a platform-dependent issue; the parsing works fine on my
> local computer (OSX, 1.6.3) and fails on the remote cluster(Centos7, 1.6.0)
>
> I noticed that in 1.6.0, json_tuple was implemented this way:
> https://github.com/apache/spark/pull/7946/files
>
> So far I have:
>
>- Checked all java system properties related to charsets on drivers
>and executors
>- Turned up logging to debug level and checked for relevant messages
>
> Any more input? Should I try the dev mailing list?
>


json_tuple fails to parse string with emoji

2017-01-24 Thread Andrew Ehrlich
On Spark 1.6.0, calling json_tuple() with an emoji character in one of the
values returns nulls:

Input:
"""
"myJsonBody": {
  "field1": ""
}
"""

Query:
"""
...
LATERAL VIEW JSON_TUPLE(e.myJsonBody,'field1') k AS field1,
...

"""

This looks like a platform-dependent issue; the parsing works fine on my
local computer (OSX, 1.6.3) and fails on the remote cluster(Centos7, 1.6.0)

I noticed that in 1.6.0, json_tuple was implemented this way:
https://github.com/apache/spark/pull/7946/files

So far I have:

   - Checked all java system properties related to charsets on drivers and
   executors
   - Turned up logging to debug level and checked for relevant messages

Any more input? Should I try the dev mailing list?


Re: Changing Spark configuration midway through application.

2016-08-10 Thread Andrew Ehrlich
If you're changing properties for the SparkContext, then I believe you will
have to start a new SparkContext with the new properties.

On Wed, Aug 10, 2016 at 8:47 AM, Jestin Ma 
wrote:

> If I run an application, for example with 3 joins:
>
> [join 1]
> [join 2]
> [join 3]
>
> [final join and save to disk]
>
> Could I change Spark properties in between each join?
>
> [join 1]
> [change properties]
> [join 2]
> [change properties]
> ...
>
> Or would I have to create a separate application with different properties
> for each of the three joins and also save each intermediate join result to
> disk?
>
> Jestin
>


Re: Tuning level of Parallelism: Increase or decrease?

2016-07-31 Thread Andrew Ehrlich
15000 seems like a lot of tasks for that size. Test it out with a .coalesce(50) 
placed right after loading the data. It will probably either run faster or 
crash with out of memory errors.

> On Jul 29, 2016, at 9:02 AM, Jestin Ma  wrote:
> 
> I am processing ~2 TB of hdfs data using DataFrames. The size of a task is 
> equal to the block size specified by hdfs, which happens to be 128 MB, 
> leading to about 15000 tasks.
> 
> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
> I'm performing groupBy, count, and an outer-join with another DataFrame of 
> ~200 MB size (~80 MB cached but I don't need to cache it), then saving to 
> disk.
> 
> Right now it takes about 55 minutes, and I've been trying to tune it.
> 
> I read on the Spark Tuning guide that:
> In general, we recommend 2-3 tasks per CPU core in your cluster.
> 
> This means that I should have about 30-50 tasks instead of 15000, and each 
> task would be much bigger in size. Is my understanding correct, and is this 
> suggested? I've read from difference sources to decrease or increase 
> parallelism, or even keep it default.
> 
> Thank you for your help,
> Jestin



Re: How to write contents of RDD to HDFS as separate file for each item in RDD (PySpark)

2016-07-31 Thread Andrew Ehrlich
You could write each image to a different directory instead of a different 
file. That can be done by filtering the RDD into one RDD for each image and 
then saving each. That might not be what you’re after though, in terms of space 
and speed efficiency. Another way would be to save them multiple outputs into 
one parquet (or text) file. There might be information on the image you can 
partition on (probably by some timestamp) to make lookups faster.

> On Jul 30, 2016, at 8:01 PM, Bhaarat Sharma  wrote:
> 
> I am just trying to do this as a proof of concept. The actual content of the 
> files will be quite bit. 
> 
> I'm having problem using foreach or something similar on an RDD. 
> sc.binaryFiles("/root/sift_images_test/*.jpg")
> returns
> ("filename1", bytes)
> ("filname2",bytes)
> I'm wondering if there is a do processing one each of these (process in this 
> case is just getting the bytes length but will be something else in real 
> world) and then write the contents to separate HDFS files. 
> If this doesn't make sense, would it make more sense to have all contents in 
> a single HDFS file?
> 
> On Sat, Jul 30, 2016 at 10:19 PM, ayan guha  > wrote:
> This sounds a bad idea, given hdfs does not work well with small files.
> 
> On Sun, Jul 31, 2016 at 8:57 AM, Bhaarat Sharma  > wrote:
> I am reading bunch of files in PySpark using binaryFiles. Then I want to get 
> the number of bytes for each file and write this number to an HDFS file with 
> the corresponding name. 
> 
> Example:
> 
> if directory /myimages has one.jpg, two.jpg, and three.jpg then I want three 
> files one-success.jpg, two-success.jpg, and three-success.jpg in HDFS with a 
> number in each. The number will specify the length of bytes. 
> 
> Here is what I've done thus far:
> 
> from pyspark import SparkContext
> import numpy as np
> 
> sc = SparkContext("local", "test")
> 
> def bytes_length(rawdata):
> length = len(np.asarray(bytearray(rawdata),dtype=np.uint8))
> return length
> 
> images = sc.binaryFiles("/root/sift_images_test/*.jpg")
> images.map(lambda(filename, contents): 
> bytes_length(contents)).saveAsTextFile("hdfs://localhost:9000/tmp/somfile")
> 
> However, doing this creates a single file in HDFS:
> $ hadoop fs -cat /tmp/somfile/part-0
> 113212
> 144926
> 178923
> Instead I want /tmp/somefile in HDFS to have three files:
> one-success.txt with value 113212
> two-success.txt with value 144926
> three-success.txt with value 178923
> 
> Is it possible to achieve what I'm after? I don't want to write files to 
> local file system and them put them in HDFS. Instead, I want to use the 
> saveAsTextFile method on the RDD directly.
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha
> 



Re: Bzip2 to Parquet format

2016-07-24 Thread Andrew Ehrlich
You can load the text with sc.textFile() to an RDD[String], then use .map() to 
convert it into an RDD[Row]. At this point you are ready to apply a schema. Use 
sqlContext.createDataFrame(rddOfRow, structType)

Here is an example on how to define the StructType (schema) that you will 
combine with the RDD[Row] to create a DataFrame.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
 


Once you have the DataFrame, save it to parquet with dataframe.save(“/path”) to 
create a parquet file.

Reference for SQLContext / createDataFrame: 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext
 




> On Jul 24, 2016, at 5:34 PM, janardhan shetty  wrote:
> 
> We have data in Bz2 compression format. Any links in Spark to convert into 
> Parquet and also performance benchmarks and uses study materials ?



Re: Size exceeds Integer.MAX_VALUE

2016-07-24 Thread Andrew Ehrlich
You can use the .repartition() function on the rdd or dataframe to set 
the number of partitions higher. Use .partitions.length to get the current 
number of partitions. (Scala API).

Andrew

> On Jul 24, 2016, at 4:30 PM, Ascot Moss <ascot.m...@gmail.com> wrote:
> 
> the data set is the training data set for random forest training, about 
> 36,500 data,  any idea how to further partition it?  
> 
> On Sun, Jul 24, 2016 at 12:31 PM, Andrew Ehrlich <and...@aehrlich.com 
> <mailto:and...@aehrlich.com>> wrote:
> It may be this issue: https://issues.apache.org/jira/browse/SPARK-6235 
> <https://issues.apache.org/jira/browse/SPARK-6235> which limits the size of 
> the blocks in the file being written to disk to 2GB.
> 
> If so, the solution is for you to try tuning for smaller tasks. Try 
> increasing the number of partitions, or using a more space-efficient data 
> structure inside the RDD, or increasing the amount of memory available to 
> spark and caching the data in memory. Make sure you are using Kryo 
> serialization. 
> 
> Andrew
> 
>> On Jul 23, 2016, at 9:00 PM, Ascot Moss <ascot.m...@gmail.com 
>> <mailto:ascot.m...@gmail.com>> wrote:
>> 
>> 
>> Hi,
>> 
>> Please help!
>> 
>> My spark: 1.6.2
>> Java: java8_u40
>> 
>> I am trying random forest training, I got " Size exceeds Integer.MAX_VALUE".
>> 
>> Any idea how to resolve it?
>> 
>> 
>> (the log) 
>> 16/07/24 07:59:49 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 
>> 25)   
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE  
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
>> at 
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
>> 
>> at 
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
>> 
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
>> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)  
>>
>> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
>>
>> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
>> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)   
>>
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)  
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)  
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)   
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)   
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)   
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>   
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>   
>> at java.lang.Thread.run(Thread.java:745)
>> 16/07/24 07:59:49 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 25, 
>> localhost): java.lang.IllegalArgumentException: Size exceeds 
>> Integer.MAX_VALUE   
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
>> at 
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
>> 
>> at 
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
>> 
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
>> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)  
>>
>> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
>>
>> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
>> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)   
>>
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.

Re: Size exceeds Integer.MAX_VALUE

2016-07-23 Thread Andrew Ehrlich
It may be this issue: https://issues.apache.org/jira/browse/SPARK-6235 
 which limits the size of the 
blocks in the file being written to disk to 2GB.

If so, the solution is for you to try tuning for smaller tasks. Try increasing 
the number of partitions, or using a more space-efficient data structure inside 
the RDD, or increasing the amount of memory available to spark and caching the 
data in memory. Make sure you are using Kryo serialization. 

Andrew

> On Jul 23, 2016, at 9:00 PM, Ascot Moss  wrote:
> 
> 
> Hi,
> 
> Please help!
> 
> My spark: 1.6.2
> Java: java8_u40
> 
> I am trying random forest training, I got " Size exceeds Integer.MAX_VALUE".
> 
> Any idea how to resolve it?
> 
> 
> (the log) 
> 16/07/24 07:59:49 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 25) 
>   
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE  
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
> 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
> 
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)   
>   
> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) 
>   
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)   
>   
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)  
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
>   
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
>   
> at org.apache.spark.scheduler.Task.run(Task.scala:89)   
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   
> at java.lang.Thread.run(Thread.java:745)
> 16/07/24 07:59:49 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 25, 
> localhost): java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE   
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
> 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
> 
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)   
>   
> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) 
>   
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)   
>   
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)  
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
>   
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
>   
> at org.apache.spark.scheduler.Task.run(Task.scala:89)   
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   
> at java.lang.Thread.run(Thread.java:745) 
> 
> 
> Regards
> 



Re: How to generate a sequential key in rdd across executors

2016-07-23 Thread Andrew Ehrlich
It’s hard to do in a distributed system. Maybe try generating a meaningful key 
using a timestamp + hashed unique key fields in the record? 

> On Jul 23, 2016, at 7:53 PM, yeshwanth kumar  wrote:
> 
> Hi,
> 
> i am doing bulk load to hbase using spark,
> in which i need to generate a sequential key for each record,
> the key should be sequential across all the executors.
> 
> i tried zipwith index, didn't worked because zipwith index gives index per 
> executor not across all executors.
> 
> looking for some suggestions.
> 
> 
> Thanks,
> -Yeshwanth


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark and plot data

2016-07-23 Thread Andrew Ehrlich
@Gourav, did you find any good inline plotting tools when using the Scala 
kernel? I found one based on highcharts but it was not frictionless the way 
matplotlib is.

> On Jul 23, 2016, at 2:26 AM, Gourav Sengupta  
> wrote:
> 
> Hi Pedro,
> 
> Toree is Scala kernel for Jupyter in case anyone needs a short intro. I use 
> it regularly (when I am not using IntelliJ) and its quite good.
> 
> Regards,
> Gourav
> 
> On Fri, Jul 22, 2016 at 11:15 PM, Pedro Rodriguez  > wrote:
> As of the most recent 0.6.0 release its partially alleviated, but still not 
> great (compared to something like Jupyter).
> 
> They can be "downloaded" but its only really meaningful in importing it back 
> to Zeppelin. It would be great if they could be exported as HTML or PDF, but 
> at present they can't be. I know they have some sort of git support, but it 
> was never clear to me how it was suppose to be used since the docs are sparse 
> on that. So far what works best for us is S3 storage, but you don't get the 
> benefit of Github using that (history + commits etc).
> 
> There are a couple other notebooks floating around, Apache Toree seems the 
> most promising for portability since its based on jupyter 
> https://github.com/apache/incubator-toree 
> 
> 
> On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta  > wrote:
> The biggest stumbling block to using Zeppelin has been that we cannot 
> download the notebooks, cannot export them and certainly cannot sync them 
> back to Github, without mind numbing and sometimes irritating hacks. Have 
> those issues been resolved?
> 
> 
> Regards,
> Gourav  
> 
> 
> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez  > wrote:
> Zeppelin works great. The other thing that we have done in notebooks (like 
> Zeppelin or Databricks) which support multiple types of spark session is 
> register Spark SQL temp tables in our scala code then escape hatch to python 
> for plotting with seaborn/matplotlib when the built in plots are insufficient.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423 
> 
> github.com/EntilZha  | LinkedIn 
> 
> 
> On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com 
> ) wrote:
> 
>> Take a look at zeppelin
>> 
>> http://zeppelin.apache.org 
>> 
>> Il giovedì 21 luglio 2016, Andy Davidson > > ha scritto:
>> Hi Pseudo
>> 
>> Plotting, graphing, data visualization, report generation are common needs 
>> in scientific and enterprise computing.
>> 
>> Can you tell me more about your use case? What is it about the current 
>> process / workflow do you think could be improved by pushing plotting (I 
>> assume you mean plotting and graphing) into spark.
>> 
>> 
>> In my personal work all the graphing is done in the driver on summary stats 
>> calculated using spark. So for me using standard python libs has not been a 
>> problem.
>> 
>> Andy
>> 
>> From: pseudo oduesp >
>> Date: Thursday, July 21, 2016 at 8:30 AM
>> To: "user @spark" >
>> Subject: spark and plot data
>> 
>> Hi , 
>> i know spark  it s engine  to compute large data set but for me i work with 
>> pyspark and it s very wonderful machine 
>> 
>> my question  we  don't have tools for ploting data each time we have to 
>> switch and go back to python for using plot.
>> but when you have large result scatter plot or roc curve  you cant use 
>> collect to take data .
>> 
>> somone have propostion for plot .
>> 
>> thanks 
>> 
>> 
>> --
>> Ing. Marco Colombo
> 
> 
> 
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodrig...@gmail.com  | pedrorodriguez.io 
>  | 909-353-4423
> Github: github.com/EntilZha  | LinkedIn: 
> https://www.linkedin.com/in/pedrorodriguezscience 
> 
> 
> 



Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Andrew Ehrlich
+1 for the misleading error. Messages about failing to connect often mean that 
an executor has died. If so, dig into the executor logs and find out why the 
executor died (out of memory, perhaps). 

Andrew

> On Jul 23, 2016, at 11:39 AM, VG  wrote:
> 
> Hi Pedro,
> 
> Based on your suggestion, I deployed this on a aws node and it worked fine. 
> thanks for your advice. 
> 
> I am still trying to figure out the issues on the local environment
> Anyways thanks again
> 
> -VG
> 
> On Sat, Jul 23, 2016 at 9:26 PM, Pedro Rodriguez  > wrote:
> Have you changed spark-env.sh or spark-defaults.conf from the default? It 
> looks like spark is trying to address local workers based on a network 
> address (eg 192.168……) instead of on localhost (localhost, 127.0.0.1, 
> 0.0.0.0,…). Additionally, that network address doesn’t resolve correctly. You 
> might also check /etc/hosts to make sure that you don’t have anything weird 
> going on.
> 
> Last thing to try perhaps is that are you running Spark within a VM and/or 
> Docker? If networking isn’t setup correctly on those you may also run into 
> trouble.
> 
> What would be helpful is to know everything about your setup that might 
> affect networking.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423 
> 
> github.com/EntilZha  | LinkedIn 
> 
> On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com 
> ) wrote:
> 
>> Hi pedro,
>> 
>> Apologies for not adding this earlier. 
>> 
>> This is running on a local cluster set up as follows.
>> JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");
>> 
>> Any suggestions based on this ? 
>> 
>> The ports are not blocked by firewall. 
>> 
>> Regards,
>> 
>> 
>> 
>> On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez > > wrote:
>> Make sure that you don’t have ports firewalled. You don’t really give much 
>> information to work from, but it looks like the master can’t access the 
>> worker nodes for some reason. If you give more information on the cluster, 
>> networking, etc, it would help.
>> 
>> For example, on AWS you can create a security group which allows all traffic 
>> to/from itself to itself. If you are using something like ufw on ubuntu then 
>> you probably need to know the ip addresses of the worker nodes beforehand.
>> 
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>> 
>> pedrorodriguez.io  | 909-353-4423 
>> 
>> github.com/EntilZha  | LinkedIn 
>> 
>> On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com 
>> ) wrote:
>> 
>>> Please suggest if I am doing something wrong or an alternative way of doing 
>>> this. 
>>> 
>>> I have an RDD with two values as follows 
>>> JavaPairRDD rdd
>>> 
>>> When I execute   rdd..collectAsMap()
>>> it always fails with IO exceptions.   
>>> 
>>> 
>>> 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning 
>>> fetch of 1 outstanding blocks 
>>> java.io.IOException: Failed to connect to /192.168.1.3:58179 
>>> 
>>> at 
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>>> at 
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
>>> at 
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
>>> at 
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> at 
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>> at 
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
>>> at 
>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
>>> at 
>>> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
>>> at 

Re: How to give name to Spark jobs shown in Spark UI

2016-07-23 Thread Andrew Ehrlich
As far as I know, the best you can do is refer to the Actions by line number.

> On Jul 23, 2016, at 8:47 AM, unk1102  wrote:
> 
> Hi I have multiple child spark jobs run at a time. Is there any way to name
> these child spark jobs so I can identify slow running ones. For e. g.
> xyz_saveAsTextFile(),  abc_saveAsTextFile() etc please guide. Thanks in
> advance. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-give-name-to-Spark-jobs-shown-in-Spark-UI-tp27400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Job trigger in production

2016-07-19 Thread Andrew Ehrlich
Another option is Oozie with the spark action: 
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
 

> On Jul 18, 2016, at 12:15 AM, Jagat Singh  wrote:
> 
> You can use following options
> 
> * spark-submit from shell 
> * some kind of job server. See spark-jobserver for details
> * some notebook environment See Zeppelin for example
> 
> 
> 
> 
> 
> On 18 July 2016 at 17:13, manish jaiswal  > wrote:
> Hi,
> 
> 
> What is the best approach to trigger spark job in production cluster?
> 



Re: the spark job is so slow - almost frozen

2016-07-19 Thread Andrew Ehrlich
Try:

- filtering down the data as soon as possible in the job, dropping columns you 
don’t need.
- processing fewer partitions of the hive tables at a time
- caching frequently accessed data, for example dimension tables, lookup 
tables, or other datasets that are repeatedly accessed
- using the Spark UI to identify the bottlenecked resource
- remove features or columns from the output data, until it runs, then add them 
back in one at a time.
- creating a static dataset small enough to work, and editing the query, then 
retesting, repeatedly until you cut the execution time by a significant fraction
- Using the Spark UI or spark shell to check the skew and make sure partitions 
are evenly distributed

> On Jul 18, 2016, at 3:33 AM, Zhiliang Zhu  wrote:
> 
> Thanks a lot for your reply .
> 
> In effect , here we tried to run the sql on kettle, hive and spark hive (by 
> HiveContext) respectively, the job seems frozen  to finish to run .
> 
> In the 6 tables , need to respectively read the different columns in 
> different tables for specific information , then do some simple calculation 
> before output . 
> join operation is used most in the sql . 
> 
> Best wishes! 
> 
> 
> 
> 
> On Monday, July 18, 2016 6:24 PM, Chanh Le  wrote:
> 
> 
> Hi,
> What about the network (bandwidth) between hive and spark? 
> Does it run in Hive before then you move to Spark?
> Because It's complex you can use something like EXPLAIN command to show what 
> going on.
> 
> 
> 
> 
>  
>> On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu > > wrote:
>> 
>> the sql logic in the program is very much complex , so do not describe the 
>> detailed codes   here . 
>> 
>> 
>> On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu > > wrote:
>> 
>> 
>> Hi All,  
>> 
>> Here we have one application, it needs to extract different columns from 6 
>> hive tables, and then does some easy calculation, there is around 100,000 
>> number of rows in each table,
>> finally need to output another table or file (with format of consistent 
>> columns) .
>> 
>>  However, after lots of days trying, the spark hive job is unthinkably slow 
>> - sometimes almost frozen. There is 5 nodes for spark cluster. 
>>  
>> Could anyone offer some help, some idea or clue is also good. 
>> 
>> Thanks in advance~
>> 
>> Zhiliang 
>> 
>> 
> 
> 
> 



Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-19 Thread Andrew Ehrlich
There is a Spark<->HBase library that does this.  I used it once in a prototype 
(never tried in production through): 
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
 


> On Jul 19, 2016, at 9:34 AM, Yu Wei  wrote:
> 
> Hi guys,
> 
> I write spark application and want to store results generated by spark 
> application to hbase.
> Do I need to access hbase via java api directly? 
> Or is it better choice to use DAO similar as traditional RDBMS?  I suspect 
> that there is major performance downgrade and other negative impacts using 
> DAO. However, I have little knowledge in this field.
> 
> Any advice?
> 
> Thanks,
> Jared



Re: Heavy Stage Concentration - Ends With Failure

2016-07-19 Thread Andrew Ehrlich
Yea this is a good suggestion; also check 25th percentile, median, and 75th 
percentile to see how skewed the input data is.

If you find that the RDD’s partitions are skewed you can solve it either by 
changing the partitioner when you read the files like already suggested, or 
call repartition() on the RDD before the bottleneck to redistribute the 
data amongst the partitions by executing a shuffle.

> On Jul 19, 2016, at 6:19 PM, Kuchekar  wrote:
> 
> Hi,
> 
> Can you check if the RDD is partitioned correctly with correct partition 
> number (if you are manually setting the partition value.) . Try using Hash 
> partitioner while reading the files.
> 
> One way you can debug is by checking the number of records that executor has 
> compared to others in the Stage tab of the Spark UI.
> 
> Kuchekar, Nilesh
> 
> On Tue, Jul 19, 2016 at 8:16 PM, Aaron Jackson  > wrote:
> Hi,
> 
> I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a job 
> that creates some 120 stages.  Eventually, the active and pending stages 
> reduce down to a small bottleneck and it never fails... the tasks associated 
> with the 10 (or so) running tasks are always allocated to the same executor 
> on the same host.
> 
> Sooner or later, it runs out of memory ... or some other resource.  It falls 
> over and then they tasks are reallocated to another executor.
> 
> Why do we see such heavy concentration of tasks onto a single executor when 
> other executors are free?  Were the tasks assigned to an executor when the 
> job was decomposed into stages?
> 



Re: spark worker continuously trying to connect to master and failed in standalone mode

2016-07-19 Thread Andrew Ehrlich
Troubleshooting steps:

$ telnet localhost 7077 (on master, to confirm port is open)
$ telnet  7077 (on slave, to confirm port is blocked)

If the port is available on the master from the master, but not on the master 
from the slave, check firewall settings on the master: 
https://help.ubuntu.com/lts/serverguide/firewall.html 

> On Jul 19, 2016, at 6:25 PM, Neil Chang  wrote:
> 
> Hi,
>   I have two virtual pcs on private cloud (ubuntu 14). I installed spark 2.0 
> preview on both machines. I then tried to test it with standalone mode.
> I have no problem start the master. However, when I start the worker (slave) 
> on another machine, it makes many attempts to connect to master and failed at 
> the end. 
>   I can ssh from each machine to another without any problem. I can also run 
> a master and worker at the same machine without any problem.
> 
> What did I miss? Any clue?
> 
> here are the messages:
> 
> WARN NativeCodeLoader: Unable to load native-hadoop library for your platform 
> ... using builtin-java classes where applicable
> ..
> INFO Worker: Connecting to master ip:7077 ... 
> INFO Worker: Retrying connection to master (attempt #1)
> ..
> INFO Worker: Retrying connection to master (attempt #7)
> java.lang.IllegalArgumentException: requirement failed: TransportClient has 
> not yet been set.
>at scala.Predef$.require(Predef.scala:224)
> ...
> WARN NettyRocEnv: Ignored failure: java.io.IOException: Connecting to ip:7077 
> timed out
> WARN Worker: Failed to connect to master ip.7077
> 
> 
> 
> Thanks,
> Neil



Re: Building standalone spark application via sbt

2016-07-19 Thread Andrew Ehrlich
Yes, spark-core will depend on Hadoop and several other jars.  Here’s the list 
of dependencies: https://github.com/apache/spark/blob/master/core/pom.xml#L35 


Whether you need spark-sql depends on whether you will use the DataFrame API. 
Without spark-sql, you will just have the RDD API.

> On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:
> 
> 
> Hi,
> Can someone please guide me what all jars I need to place in my lib folder of 
> the project to build a standalone scala application via sbt.
> 
> Note I need to provide static dependencies and I cannot download the jars 
> using libraryDependencies.
> So I need to provide all the jars upfront.
> 
> So far I found that we need:
> spark-core_.jar
> 
> Do we also need
> spark-sql_.jar
> and
> hadoop-core-.jar
> 
> Is there any jar from spark side I may be missing? What I found that 
> spark-core needs hadoop-core classes and if I don't add them then sbt was 
> giving me this error:
> [error] bad symbolic reference. A signature in SparkContext.class refers to 
> term hadoop
> [error] in package org.apache which is not available.
> 
> So I was just confused on library dependency part when building an 
> application via sbt. Any inputs here would be helpful.
> 
> Thanks
> Sachin
> 
> 
> 



Re: Spark performance testing

2016-07-08 Thread Andrew Ehrlich
Yea, I'm looking for any personal experiences people have had with tools like 
these. 

> On Jul 8, 2016, at 8:57 PM, charles li <charles.up...@gmail.com> wrote:
> 
> Hi, Andrew, I've got lots of materials when asking google for "spark 
> performance test"
> 
> https://github.com/databricks/spark-perf
> https://spark-summit.org/2014/wp-content/uploads/2014/06/Testing-Spark-Best-Practices-Anupama-Shetty-Neil-Marshall.pdf
> http://people.cs.vt.edu/~butta/docs/tpctc2015-sparkbench.pdf
> 
> 
>> On Sat, Jul 9, 2016 at 11:40 AM, Andrew Ehrlich <and...@aehrlich.com> wrote:
>> Hi group,
>> 
>> What solutions are people using to do performance testing and tuning of 
>> spark applications? I have been doing a pretty manual technique where I lay 
>> out an Excel sheet of various memory settings and caching parameters and 
>> then execute each one by hand. It’s pretty tedious though, so I’m wondering 
>> what others do, and if you do performance testing at all.  Also, is anyone 
>> generating test data, or just operating on a static set? Is regression 
>> testing for performance a thing?
>> 
>> Andrew
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> -- 
> ___
> Quant | Engineer | Boy
> ___
> blog:http://litaotao.github.io
> github: www.github.com/litaotao


Spark performance testing

2016-07-08 Thread Andrew Ehrlich
Hi group,

What solutions are people using to do performance testing and tuning of spark 
applications? I have been doing a pretty manual technique where I lay out an 
Excel sheet of various memory settings and caching parameters and then execute 
each one by hand. It’s pretty tedious though, so I’m wondering what others do, 
and if you do performance testing at all.  Also, is anyone generating test 
data, or just operating on a static set? Is regression testing for performance 
a thing?

Andrew
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: never understand

2016-05-25 Thread Andrew Ehrlich
- Try doing less in each transformation
- Try using different data structures within the transformations
- Try not caching anything to free up more memory


On Wed, May 25, 2016 at 1:32 AM, pseudo oduesp 
wrote:

> hi guys ,
> -i get this errors with pyspark 1.5.0 under cloudera CDH 5.5 (yarn)
>
> -i use yarn to deploy job on cluster.
> -i use hive context  and parquet file to save my data.
> limit container 16 GB
> number of executor i tested befor it s 12 GB (executor memory)
> -i tested  to increase number of partitions (by default it s 200) i
> multipie by 2 and 3  whitout succes.
>
> -I try to change number of sql partitins shuffle
>
>
> -i remarque in spark UI when (shuffle write it triggerd no problem) but
> (when shuffle read triggerd i lost executors and get erros)
>
>
>
> and realy blocked by this error  where she came from
>
>
>
>
>  ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread
> Thread[Executor task launch worker-5,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at
> parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
> at parquet.column.values.dictionary.IntList.(IntList.java:86)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter.(DictionaryValuesWriter.java:93)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.(DictionaryValuesWriter.java:229)
> at
> parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
> at
> parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
> at
> parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
> at
> parquet.column.impl.ColumnWriterV1.(ColumnWriterV1.java:84)
> at
> parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
> at
> parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
> at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:207)
> at
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:405)
> at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107)
> at
> parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:97)
> at
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:100)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:326)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:233)
>  at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:405)
> at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107)
> at
> parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:97)
> at
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:100)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:326)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:233)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 16/05/25 09:54:42 ERROR util.SparkUncaughtExceptionHandler: Uncaught
> exception in thread Thread[Executor task launch worker-6,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at
> parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
>   

Re: subtractByKey increases RDD size in memory - any ideas?

2016-02-18 Thread Andrew Ehrlich
There could be clues in the different RDD subclasses; rdd1 is
ParallelCollectionRDD but rdd3 is SubtractedRDD.

On Thu, Feb 18, 2016 at 1:37 PM, DaPsul  wrote:

> (copy from
>
> http://stackoverflow.com/questions/35467128/spark-subtractbykey-increases-rdd-cached-memory-size
> )
>
> I've found a very strange behavior for RDD's (spark 1.6.0 with scala 2.11):
>
> When i use subtractByKey on an RDD the resulting RDD should be of equal or
> smaller size. What i get is an RDD that takes even more space in memory:
>
> //Initialize first RDD
> val rdd1 = sc.parallelize(Array((1,1),(2,2),(3,3))).cache()
>
> //dummy action to cache it => size according to webgui: 184 Bytes
> rdd1.first
>
> //Initialize RDD to subtract (empty RDD should result in no change for
> rdd1)
> val rdd2 = sc.parallelize(Array[(Int,Int)]())
>
> //perform subtraction
> val rdd3 = rdd1.subtractByKey(rdd2).cache()
>
> //dummy action to cache rdd3 => size according to webgui: 208 Bytes
> rdd3.first
>
> I frist realized this strange behaviour for an RDD of ~200k rows and size
> 1.3 GB that scaled up to more than 2 GB after subtraction
>
> Edit: Tried the example above with more values(10k) => same behaviour. The
> size increases by ~1.6 times. Also reduceByKey seems to have a similar
> effect.
>
> When i create an RDD by
>
> sc.paralellize(rdd3.collect())
>
> the size is the same as for rdd3, so the increased size carries over even
> if
> it's extracted from RDD.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/subtractByKey-increases-RDD-size-in-memory-any-ideas-tp26272.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive REGEXP_REPLACE use or equivalent in Spark

2016-02-18 Thread Andrew Ehrlich
Use the scala method .split(",") to split the string into a collection of
strings, and try using .replaceAll() on the field with the "?" to remove it.

On Thu, Feb 18, 2016 at 2:09 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> What is the equivalent of this Hive statement in Spark
>
>
>
> select "?2,500.00", REGEXP_REPLACE("?2,500.00",'[^\\d\\.]','');
> ++--+--+
> |_c0 |   _c1|
> ++--+--+
> | ?2,500.00  | 2500.00  |
> ++--+--+
>
> Basically I want to get rid of "?" and "," in the csv file
>
>
>
> The full csv line is
>
>
>
> scala> csv2.first
> res94: String = 360,10/02/2014,"?2,500.00",?0.00,"?2,500.00"
>
> I want to transform that string into 5 columns and use "," as the split
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>


Re: send transformed RDD to s3 from slaves

2015-11-14 Thread Andrew Ehrlich
Maybe you want to be using rdd.saveAsTextFile() ?

> On Nov 13, 2015, at 4:56 PM, Walrus theCat  wrote:
> 
> Hi,
> 
> I have an RDD which crashes the driver when being collected.  I want to send 
> the data on its partitions out to S3 without bringing it back to the driver. 
> I try calling rdd.foreachPartition, but the data that gets sent has not gone 
> through the chain of transformations that I need.  It's the data as it was 
> ingested initially.  After specifying my chain of transformations, but before 
> calling foreachPartition, I call rdd.count in order to force the RDD to 
> transform.  The data it sends out is still not transformed.  How do I get the 
> RDD to send out transformed data when calling foreachPartition?
> 
> Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org