Re: zip for pyspark

2016-08-08 Thread Ewan Leith
If you build a normal python egg file with the dependencies, you can execute 
that like you are executing a .py file with  --py-files

Thanks,
Ewan

On 8 Aug 2016 3:44 p.m., pseudo oduesp  wrote:
hi,
how i can export all project on pyspark like zip   from local session to 
cluster and deploy with spark submit  i mean i have a large project with all 
dependances and i want create zip containing all of dependecs and deploy it on 
cluster



Re: Spark 2.0.0 - Apply schema on few columns of dataset

2016-08-07 Thread Ewan Leith
Looking at the encoders api documentation at

http://spark.apache.org/docs/latest/api/java/

== Java == Encoders are specified by calling static methods on 
Encoders.

List data = Arrays.asList("abc", "abc", "xyz"); Dataset ds = 
context.createDataset(data, Encoders.STRING());

I think you should be calling

.as((Encoders.STRING(), Encoders.STRING()))

or similar

Ewan

On 8 Aug 2016 06:10, Aseem Bansal  wrote:
Hi All

Has anyone done this with Java API?

On Fri, Aug 5, 2016 at 5:36 PM, Aseem Bansal 
> wrote:
I need to use few columns out of a csv. But as there is no option to read few 
columns out of csv so
 1. I am reading the whole CSV using SparkSession.csv()
 2.  selecting few of the columns using DataFrame.select()
 3. applying schema using the .as() function of Dataset.  I tried to 
extent org.apache.spark.sql.Encoder as the input for as function

But I am getting the following exception

Exception in thread "main" java.lang.RuntimeException: Only expression encoders 
are supported today

So my questions are -
1. Is it possible to read few columns instead of whole CSV? I cannot change the 
CSV as that is upstream data
2. How do I apply schema to few columns if I cannot write my encoder?




RE: how to save spark files as parquets efficiently

2016-07-29 Thread Ewan Leith
If you replace the df.write ….

With

df.count()

in your code you’ll see how much time is taken to process the full execution 
plan without the write output.

That code below looks perfectly normal for writing a parquet file yes, there 
shouldn’t be any tuning needed for “normal” performance.

Thanks,
Ewan

From: Sumit Khanna [mailto:sumit.kha...@askme.in]
Sent: 29 July 2016 13:41
To: Gourav Sengupta 
Cc: user 
Subject: Re: how to save spark files as parquets efficiently

Hey Gourav,

Well so I think that it is my execution plan that is at fault. So basically 
df.write as a spark job on localhost:4040/ well being an action will include 
the time taken for all the umpteen transformation on it right? All I wanted to 
know is "what apt env/config params are needed to something simple read a 
dataframe from parquet and save it back as another parquet (meaning vanilla 
load/store no transformation). Is it good enough to simply read. and write. in 
the very format mentioned in spark tutorial docs i.e

df.write.format("parquet").mode("overwrite").save(hdfspathTemp) ??

Thanks,

On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta 
> wrote:
Hi,

The default write format in SPARK is parquet. And I have never faced any issues 
writing over a billion records in SPARK. Are you using virtualization by any 
chance or an obsolete hard disk or Intel Celeron may be?
Regards,
Gourav Sengupta

On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna 
> wrote:
Hey,

master=yarn
mode=cluster

spark.executor.memory=8g
spark.rpc.netty.dispatcher.numThreads=2

All the POC on a single node cluster. the biggest bottle neck being :

1.8 hrs to save 500k records as a parquet file/dir executing this command :


df.write.format("parquet").mode("overwrite").save(hdfspathTemp)

No doubt, the whole execution plan gets triggered on this write / save action. 
But is it the right command / set of params to save a dataframe?

essentially I am doing an upsert by pulling in data from hdfs and then updating 
it with the delta changes of the current run. But not sure if write itself 
takes that much time or some optimization is needed for upsert. (I have that 
asked as another question altogether).

Thanks,
Sumit





RE: Role-based S3 access outside of EMR

2016-07-21 Thread Ewan Leith
If you use S3A rather than S3N, it supports IAM roles.

I think you can make s3a used for s3:// style URLs so it’s consistent with your 
EMR paths by adding this to your Hadoop config, probably in core-site.xml:

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A
fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A

And make sure the s3a jars are in your classpath

Thanks,
Ewan

From: Everett Anderson [mailto:ever...@nuna.com.INVALID]
Sent: 21 July 2016 17:01
To: Gourav Sengupta 
Cc: Teng Qiu ; Andy Davidson 
; user 
Subject: Re: Role-based S3 access outside of EMR

Hey,

FWIW, we are using EMR, actually, in production.

The main case I have for wanting to access S3 with Spark outside of EMR is that 
during development, our developers tend to run EC2 sandbox instances that have 
all the rest of our code and access to some of the input data on S3. It'd be 
nice if S3 access "just worked" on these without storing the access keys in an 
exposed manner.

Teng -- when you say you use EMRFS, does that mean you copied AWS's EMRFS JAR 
from an EMR cluster and are using it outside? My impression is that AWS hasn't 
released the EMRFS implementation as part of the aws-java-sdk, so I'm wary of 
using it. Do you know if it's supported?


On Thu, Jul 21, 2016 at 2:32 AM, Gourav Sengupta 
> wrote:
Hi Teng,
This is totally a flashing news for me, that people cannot use EMR in 
production because its not open sourced, I think that even Werner is not aware 
of such a problem. Is EMRFS opensourced? I am curious to know what does HA 
stand for?
Regards,
Gourav

On Thu, Jul 21, 2016 at 8:37 AM, Teng Qiu 
> wrote:
there are several reasons that AWS users do (can) not use EMR, one
point for us is that security compliance problem, EMR is totally not
open sourced, we can not use it in production system. second is that
EMR do not support HA yet.

but to the original question from @Everett :

-> Credentials and Hadoop Configuration

as you said, best practice should be "rely on machine roles", they
called IAM roles.

we are using EMRFS impl for accessing s3, it supports IAM role-based
access control well. you can take a look here:
https://github.com/zalando/spark/tree/branch-1.6-zalando

or simply use our docker image (Dockerfile on github:
https://github.com/zalando/spark-appliance/tree/master/Dockerfile)

docker run -d --net=host \
   -e START_MASTER="true" \
   -e START_WORKER="true" \
   -e START_WEBAPP="true" \
   -e START_NOTEBOOK="true" \
   
registry.opensource.zalan.do/bi/spark:1.6.2-6


-> SDK and File System Dependencies

as mentioned above, using EMRFS libs solved this problem:
http://docs.aws.amazon.com//ElasticMapReduce/latest/ReleaseGuide/emr-fs.html


2016-07-21 8:37 GMT+02:00 Gourav Sengupta 
>:
> But that would mean you would be accessing data over internet increasing
> data read latency, data transmission failures. Why are you not using EMR?
>
> Regards,
> Gourav
>
> On Thu, Jul 21, 2016 at 1:06 AM, Everett Anderson 
> >
> wrote:
>>
>> Thanks, Andy.
>>
>> I am indeed often doing something similar, now -- copying data locally
>> rather than dealing with the S3 impl selection and AWS credentials issues.
>> It'd be nice if it worked a little easier out of the box, though!
>>
>>
>> On Tue, Jul 19, 2016 at 2:47 PM, Andy Davidson
>> > wrote:
>>>
>>> Hi Everett
>>>
>>> I always do my initial data exploration and all our product development
>>> in my local dev env. I typically select a small data set and copy it to my
>>> local machine
>>>
>>> My main() has an optional command line argument ‘- - runLocal’ Normally I
>>> load data from either hdfs:/// or S3n:// . If the arg is set I read from
>>> file:///
>>>
>>> Sometime I use a CLI arg ‘- -dataFileURL’
>>>
>>> So in your case I would log into my data cluster and use “AWS s3 cp" to
>>> copy the data into my cluster and then use “SCP” to copy the data from the
>>> data center back to my local env.
>>>
>>> Andy
>>>
>>> From: Everett Anderson 
>>> >
>>> Date: Tuesday, July 19, 2016 at 2:30 PM
>>> To: "user @spark" >
>>> Subject: Role-based S3 access outside of EMR
>>>
>>> Hi,
>>>
>>> When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
>>> FileSystem implementation for s3:// URLs and 

RE: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Ewan Leith
Writing (or reading) small files from spark to s3 can be seriously slow.

You'll get much higher throughput by doing a df.foreachPartition(partition => 
...) and inside each partition, creating an aws s3 client then doing a 
partition.foreach and uploading the files using that s3 client with its own 
threadpool.

As long as you create the s3 client inside the foreachPartition, and close it 
after the partition.foreach(...) is done, you shouldn't have any issues.

Something roughly like this from the DStream docs:

  df.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
  }

Hope this helps,
Ewan

-Original Message-
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 08 July 2016 15:31
To: Andy Davidson 
Cc: user @spark 
Subject: Re: is dataframe.write() async? Streaming performance problem

Maybe obvious, but what happens when you change the s3 write to a println of 
all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the items in 
the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson  
wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using 
> kafka direct stream approach. I am running into performance problems. 
> My processing time is > than my window size. Changing window sizes, 
> adding cores and executor memory does not change performance. I am 
> having a lot of trouble identifying the problem by at the metrics 
> provided for streaming apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts 
> the data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle 
> is an issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List tags = new ArrayList(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or 
> do I have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = 
> activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue 
> performance is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + 
> File.separator + tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Nested Array of JSON with empty field

2016-06-05 Thread Ewan Leith
The spark json read is unforgiving of things like missing elements from some 
json records, or mixed types.

If you want to pass invalid json files through spark you're best doing an 
initial parse through the Jackson APIs using a defined schema first, then you 
can set types like Option[String] where a column is optional, then convert the 
validated back into a new string variable, then read the string as a dataframe.

Thanks,
Ewan

On 3 Jun 2016 22:03, Jerry Wong  wrote:
Hi,

I met a problem of empty field in the nested JSON file with Spark SQL. For 
instance,
There are two lines of JSON file as follows,

{
"firstname": "Jack",
"lastname": "Nelson",
"address": {
"state": "New York",
"city": "New York"
}
}{
"firstname": "Landy",
"middlename": "Ken",
"lastname": "Yong",
"address": {
"state": "California",
"city": "Los Angles"
}
}

I use Spark SQL to get the files like,
val row = sqlContext.sql("SELECT firstname, middlename, lastname, 
address.state, address.city FROM jsontable")
The compile will tell me the error of line1: no "middlename".
How do I handle this case in the SQL sql?

Many thanks in advance!
Jerry




RE: Timed aggregation in Spark

2016-05-23 Thread Ewan Leith
Rather than open a connection per record, if you do a DStream foreachRDD at the 
end of a 5 minute batch window

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

then you can do a rdd.foreachPartition to get the RDD partitions. Open a 
connection to vertica (or a pool of them) inside that mapPartitions, then do a 
partition.foreach to write each element from that partition to vertica, before 
finally closing the pool of connections.

Hope this helps,
Ewan

From: Nikhil Goyal [mailto:nownik...@gmail.com]
Sent: 23 May 2016 21:55
To: Ofir Kerker 
Cc: user@spark.apache.org
Subject: Re: Timed aggregation in Spark

I don't think this is solving the problem. So here are the issues:
1) How do we push entire data to vertica. Opening a connection per record will 
be too costly
2) If a key doesn't come again, how do we push this to vertica
3) How do we schedule the dumping of data to avoid loading too much data in 
state.



On Mon, May 23, 2016 at 1:33 PM, Ofir Kerker 
> wrote:
Yes, check out mapWithState:
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html

_
From: Nikhil Goyal >
Sent: Monday, May 23, 2016 23:28
Subject: Timed aggregation in Spark
To: >


Hi all,

I want to aggregate my data for 5-10 min and then flush the aggregated data to 
some database like vertica. updateStateByKey is not exactly helpful in this 
scenario as I can't flush all the records at once, neither can I clear the 
state. I wanted to know if anyone else has faced a similar issue and how did 
they handle it.

Thanks
Nikhil




Spark Streaming - Exception thrown while writing record: BlockAdditionEvent

2016-05-23 Thread Ewan Leith
As we increase the throughput on our Spark streaming application, we're finding 
we hit errors with the WriteAheadLog, with errors like this:

16/05/21 20:42:21 WARN scheduler.ReceivedBlockTracker: Exception thrown while 
writing record: 
BlockAdditionEvent(ReceivedBlockInfo(0,Some(10),None,WriteAheadLogBasedStoreResult(input-0-1463850002991,Some(10),FileBasedWriteAheadLogSegment(hdfs://x.x.x.x:8020/checkpoint/receivedData/0/log-1463863286930-1463863346930,625283,39790
 to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
16/05/21 20:42:26 WARN scheduler.ReceivedBlockTracker: Exception thrown while 
writing record: 
BlockAdditionEvent(ReceivedBlockInfo(1,Some(10),None,WriteAheadLogBasedStoreResult(input-1-1462971836350,Some(10),FileBasedWriteAheadLogSegment(hdfs://x.x.x.x:8020/checkpoint/receivedData/1/log-1463863313080-1463863373080,455191,60798
 to the WriteAheadLog.

I've found someone else on StackOverflow with the same issue, who's suggested 
increasing the spark.streaming.driver.writeAheadLog.batchingTimeout setting, 
but we're not actually seeing significant performance issues on HDFS when the 
issue occurs.

http://stackoverflow.com/questions/34879092/reliability-issues-with-checkpointing-wal-in-spark-streaming-1-6-0

Has anyone else come across this, and any suggested areas we can look at?

Thanks,
Ewan


RE: Spark 1.6.0: substring on df.select

2016-05-12 Thread Ewan Leith
You could use a UDF pretty easily, something like this should work, the 
lastElement function could be changed to do pretty much any string manipulation 
you want.

import org.apache.spark.sql.functions.udf

def lastElement(input: String) = input.split("/").last

val lastElementUdf = udf(lastElement(_:String))

df.select(lastElementUdf ($"col1")).show()

Ewan


From: Bharathi Raja [mailto:raja...@yahoo.com.INVALID]
Sent: 12 May 2016 11:40
To: Raghavendra Pandey ; Bharathi Raja 

Cc: User 
Subject: RE: Spark 1.6.0: substring on df.select

Thanks Raghav.

I have 5+ million records. I feel creating multiple come is not an optimal way.

Please suggest any other alternate solution.
Can’t we do any string operation in DF.Select?

Regards,
Raja

From: Raghavendra Pandey
Sent: 11 May 2016 09:04 PM
To: Bharathi Raja
Cc: User
Subject: Re: Spark 1.6.0: substring on df.select


You can create a column with count of /.  Then take max of it and create that 
many columns for every row with null fillers.

Raghav
On 11 May 2016 20:37, "Bharathi Raja" 
> wrote:
Hi,

I have a dataframe column col1 with values something like 
“/client/service/version/method”. The number of “/” are not constant.
Could you please help me to extract all methods from the column col1?

In Pig i used SUBSTRING with LAST_INDEX_OF(“/”).

Thanks in advance.
Regards,
Raja



RE: Parse Json in Spark

2016-05-09 Thread Ewan Leith
The simplest way is probably to use the sc.binaryFiles or sc.wholeTextFiles API 
to create an RDD containing the JSON files (maybe need a 
sc.wholeTextFiles(…).map(x => x._2) to drop off the filename column) then do a 
sqlContext.read.json(rddName)

That way, you don’t need to worry about combining lines.

Ewan

From: KhajaAsmath Mohammed [mailto:mdkhajaasm...@gmail.com]
Sent: 08 May 2016 23:20
To: user @spark 
Subject: Parse Json in Spark

Hi,

I am working on parsing the json in spark but most of the information available 
online states that  I need to have entire JSON in single line.

In my case, Json file is delivered in complex structure and not in a single 
line. could anyone know how to process this in SPARK.

I used Jackson jar to process json and was able to do it when it is present in 
single line. Any ideas?

Thanks,
Asmath


RE: Spark streaming - update configuration while retaining write ahead log data?

2016-03-15 Thread Ewan Leith
That’s what I thought, it’s a shame!

Thanks Saisai,

Ewan

From: Saisai Shao [mailto:sai.sai.s...@gmail.com]
Sent: 15 March 2016 09:22
To: Ewan Leith <ewan.le...@realitymine.com>
Cc: user <user@spark.apache.org>
Subject: Re: Spark streaming - update configuration while retaining write ahead 
log data?

Currently configuration is a part of checkpoint data, and when recovering from 
failure, Spark Streaming will fetch the configuration from checkpoint data, so 
even if you change the configuration file, recovered Spark Streaming 
application will not use it. So from my understanding currently there's no way 
to handle your situation.

Thanks
Saisai

On Tue, Mar 15, 2016 at 5:12 PM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:
Has anyone seen a way of updating the Spark streaming job configuration while 
retaining the existing data in the write ahead log?

e.g. if you’ve launched a job without enough executors and a backlog has built 
up in the WAL, can you increase the number of executors without losing the WAL 
data?

Thanks,
Ewan



Spark streaming - update configuration while retaining write ahead log data?

2016-03-15 Thread Ewan Leith
Has anyone seen a way of updating the Spark streaming job configuration while 
retaining the existing data in the write ahead log?

e.g. if you've launched a job without enough executors and a backlog has built 
up in the WAL, can you increase the number of executors without losing the WAL 
data?

Thanks,
Ewan


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Thanks, I'll create the JIRA for it. Happy to help contribute to a patch if we 
can, not sure if my own scala skills will be up to it but perhaps one of my 
colleagues' will :)

Ewan

I don't think that exists right now, but it's definitely a good option to have. 
I myself have run into this issue a few times.

Can you create a JIRA ticket so we can track it? Would be even better if you 
are interested in working on a patch! Thanks.


On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:

Hi Reynold, yes that would be perfect for our use case.

I assume it doesn't exist though, otherwise I really need to go re-read the 
docs!

Thanks to both of you for replying by the way, I know you must be hugely busy.

Ewan

Are you looking for "relaxed" mode that simply return nulls for fields that 
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:

Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that’s compatible with some of the records, but incompatible with 
others, it seems you can’t do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn’t good behaviour, but if you’re reading data 
from (for example) JSON files, there’s going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don’t know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I’ve attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you’d want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalys

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Hi Reynold, yes that would be perfect for our use case.

I assume it doesn't exist though, otherwise I really need to go re-read the 
docs!

Thanks to both of you for replying by the way, I know you must be hugely busy.

Ewan

Are you looking for "relaxed" mode that simply return nulls for fields that 
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:

Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that’s compatible with some of the records, but incompatible with 
others, it seems you can’t do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn’t good behaviour, but if you’re reading data 
from (for example) JSON files, there’s going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don’t know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I’ve attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you’d want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)

(The full exception is attached too).

What do people think, is this a bug?

Thanks,
Ewan


-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr

Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Ewan Leith
The Apache Commons library will let you access files on an SFTP server via a 
Java library, no local file handling involved

https://commons.apache.org/proper/commons-vfs/filesystems.html

Hope this helps,
Ewan

I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? 
I am able to download the file first locally using the SFTP Client in the 
spark-sftp package. Then, I load the file into a dataframe using the spark-csv 
package, which automatically decompresses the file. I just want to remove the 
"downloading file to local" step and directly have the remote file 
decompressed, read, and loaded. Can someone give me any hints?

Thanks,
Ben



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
Thanks Michael, it's not a great example really, as the data I'm working with 
has some source files that do fit the schema, and some that don't (out of 
millions that do work, perhaps 10 might not).

In an ideal world for us the select would probably return the valid records 
only.

We're trying out the new dataset APIs to see if we can do some pre-filtering 
that way.

Thanks,
Ewan

-dev +user

StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

Its not a great error message, but as the schema above shows, stuff is an 
array, not a struct.  So, you need to pick a particular element (using []) 
before you can pull out a specific field.  It would be easier to see this if 
you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a tree view. 
 Try the following.

sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")

On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:
When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that's compatible with some of the records, but incompatible with 
others, it seems you can't do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn't good behaviour, but if you're reading data 
from (for example) JSON files, there's going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don't know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I've attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you'd want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)

(The full exception is attached too).

What do people think, is this a bug?

Thanks,
Ewan


-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>



RE: how to correctly run scala script using spark-shell through stdin (spark v1.0.0)

2016-01-26 Thread Ewan Leith
I’ve just tried running this using a normal stdin redirect:

~/spark/bin/spark-shell < simple.scala

Which worked, it started spark-shell, executed the script, the stopped the 
shell.

Thanks,
Ewan

From: Iulian Dragoș [mailto:iulian.dra...@typesafe.com]
Sent: 26 January 2016 15:00
To: fernandrez1987 
Cc: user 
Subject: Re: how to correctly run scala script using spark-shell through stdin 
(spark v1.0.0)


I don’t see -i in the output of spark-shell --help. Moreover, in master I get 
an error:

$ bin/spark-shell -i test.scala

bad option: '-i'

iulian
​

On Tue, Jan 26, 2016 at 3:47 PM, fernandrez1987 
> wrote:
spark-shell -i file.scala is not working for me in Spark 1.6.0, was this
removed or what do I have to take into account? The script does not get run
at all. What can be happening?








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-correctly-run-scala-script-using-spark-shell-through-stdin-spark-v1-0-0-tp12972p26071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



--

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com



RE: Write to S3 with server side encryption in KMS mode

2016-01-26 Thread Ewan Leith
Hi Nisrina, I’m not aware of any support for KMS keys in s3n, s3a or the EMR 
specific EMRFS s3 driver.

If you’re using EMRFS with Amazon’s EMR, you can use KMS keys with client-side 
encryption

http://docs.aws.amazon.com/kms/latest/developerguide/services-emr.html#emrfs-encrypt

If this has changed, I’d love to know, but I’m pretty sure it hasn’t.

The alternative is to write to HDFS, then copy the data across in bulk.

Thanks,
Ewan



From: Nisrina Luthfiyati [mailto:nisrina.luthfiy...@gmail.com]
Sent: 26 January 2016 10:42
To: user 
Subject: Write to S3 with server side encryption in KMS mode

Hi all,

I'm trying to save a spark application output to a bucket in S3. The data is 
supposed to be encrypted with S3's server side encryption using KMS mode, which 
typically (using java api/cli) would require us to pass the sse-kms key when 
writing the data. I currently have not found a way to do this using spark 
hadoop config. Would anyone have any idea how this can be done or whether this 
is possible?

Thanks,
Nisrina.



RE: Out of memory issue

2016-01-06 Thread Ewan Leith
Hi Muthu, this could be related to a known issue in the release notes

http://spark.apache.org/releases/spark-release-1-6-0.html

Known issues

SPARK-12546 -  Save DataFrame/table as Parquet with dynamic partitions may 
cause OOM; this can be worked around by decreasing the memory used by both 
Spark and Parquet using spark.memory.fraction (for example, 0.4) and 
parquet.memory.pool.ratio (for example, 0.3, in Hadoop configuration, e.g. 
setting it in core-site.xml).

It's definitely worth setting spark.memory.fraction and 
parquet.memory.pool.ratio and trying again.

Ewan

-Original Message-
From: babloo80 [mailto:bablo...@gmail.com] 
Sent: 06 January 2016 03:44
To: user@spark.apache.org
Subject: Out of memory issue

Hello there,

I have a spark job reads 7 parquet files (8 GB, 3 x 16 GB, 3 x 14 GB) in 
different stages of execution and creates a result parquet of 9 GB (about 27 
million rows containing 165 columns. some columns are map based containing 
utmost 200 value histograms). The stages involve, Step 1: Reading the data 
using dataframe api Step 2: Transform dataframe to RDD (as the some of the 
columns are transformed into histograms (using empirical distribution to cap 
the number of keys) and some of them run like UDAF during reduce-by-key step) 
to perform and perform some transformations Step 3: Reduce the result by key so 
that the resultant can be used in the next stage for join Step 4: Perform left 
outer join of this result which runs similar Steps 1 thru 3. 
Step 5: The results are further reduced to be written to parquet

With Apache Spark 1.5.2, I am able to run the job with no issues.
Current env uses 8 nodes running a total of  320 cores, 100 GB executor memory 
per node with driver program using 32 GB. The approximate execution time is 
about 1.2 hrs. The parquet files are stored in another HDFS cluster for read 
and eventual write of the result.

When the same job is executed using Apache 1.6.0, some of the executor node's 
JVM gets restarted (with a new executor id). On further turning-on GC stats on 
the executor, the perm-gen seem to get maxed out and ends up showing the 
symptom of out-of-memory. 

Please advice on where to start investigating this issue. 

Thanks,
Muthu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-issue-tp25888.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to accelerate reading json file?

2016-01-06 Thread Ewan Leith
If you already know the schema, then you can run the read with the schema 
parameter like this:


val path = "examples/src/main/resources/jsonfile"

val jsonSchema =  StructType(
StructField("id",StringType,true) ::
StructField("reference",LongType,true) ::
StructField("details",detailsSchema, true) ::
StructField("value",StringType,true) ::Nil)

val people = sqlContext.read.schema(jsonSchema).json(path)
If you have the schema defined as a separate small JSON file, then you can load 
it by running something like this line to load it directly:

val jsonSchema = sqlContext.read.json(“path/to/schema”).schema

Thanks,
Ewan

From: Gavin Yue [mailto:yue.yuany...@gmail.com]
Sent: 06 January 2016 07:14
To: user 
Subject: How to accelerate reading json file?

I am trying to read json files following the example:

val path = "examples/src/main/resources/jsonfile"

val people = sqlContext.read.json(path)

I have 1 Tb size files in the path.  It took 1.2 hours to finish the reading to 
infer the schema.

But I already know the schema. Could I make this process short?

Thanks a lot.





Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2015-12-31 Thread Ewan Leith
Hi all,

I'm sure this must have been solved already, but I can't see anything obvious.

Using Spark Streaming, I'm trying to execute a transform function on a DStream 
at short batch intervals (e.g. 1 second), but only write the resulting data to 
disk using saveAsTextFiles in a larger batch after a longer delay (say 60 
seconds).

I thought the ReceiverInputDStream window function might be a good help here, 
but instead, applying it to a transformed DStream causes the transform function 
to only execute at the end of the window too.

Has anyone got a solution to this?

Thanks,
Ewan





RE: Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2015-12-31 Thread Ewan Leith
Yeah it's awkward, the transforms being done are fairly time sensitive, so I 
don't want them to wait 60 seconds or more.

I might have to move the code from a transform into a custom receiver instead, 
so they'll be processed outside the window length. A buffered writer is a good 
idea too, thanks.

Thanks,
Ewan

From: Ashic Mahtab [mailto:as...@live.com]
Sent: 31 December 2015 13:50
To: Ewan Leith <ewan.le...@realitymine.com>; Apache Spark 
<user@spark.apache.org>
Subject: RE: Batch together RDDs for Streaming output, without delaying 
execution of map or transform functions

Hi Ewan,
Transforms are definitions of what needs to be done - they don't execute until 
and action is triggered. For what you want, I think you might need to have an 
action that writes out rdds to some sort of buffered writer.

-Ashic.

From: ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Batch together RDDs for Streaming output, without delaying execution 
of map or transform functions
Date: Thu, 31 Dec 2015 11:35:37 +
Hi all,

I'm sure this must have been solved already, but I can't see anything obvious.

Using Spark Streaming, I'm trying to execute a transform function on a DStream 
at short batch intervals (e.g. 1 second), but only write the resulting data to 
disk using saveAsTextFiles in a larger batch after a longer delay (say 60 
seconds).

I thought the ReceiverInputDStream window function might be a good help here, 
but instead, applying it to a transformed DStream causes the transform function 
to only execute at the end of the window too.

Has anyone got a solution to this?

Thanks,
Ewan





RE: Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1

2015-11-16 Thread Ewan Leith
How big do you expect the file to be? Spark has issues with single blocks over 
2GB (see https://issues.apache.org/jira/browse/SPARK-1476 and 
https://issues.apache.org/jira/browse/SPARK-6235 for example)

If you don’t know, try running

df.repartition(100).write.format…

to get an idea of how  big it would be, I assume it’s over 2 GB

From: Zhang, Jingyu [mailto:jingyu.zh...@news.com.au]
Sent: 16 November 2015 10:17
To: user 
Subject: Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1


I am using spark-csv to save files in s3, it shown Size exceeds. Please let me 
know how to fix it. Thanks.

df.write()

.format("com.databricks.spark.csv")

.option("header", "true")

.save("s3://newcars.csv");

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)

at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)

at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)

at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)

at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)

at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


This message and its attachments may contain legally privileged or confidential 
information. It is intended solely for the named addressee. If you are not the 
addressee indicated in this message or responsible for delivery of the message 
to the addressee, you may not copy or deliver this message or its attachments 
to anyone. Rather, you should permanently delete this message and its 
attachments and kindly notify the sender by reply e-mail. Any content of this 
message and its attachments which does not relate to the official business of 
the sending company must be taken not to have been sent or endorsed by that 
company or any of its related entities. No warranty is made that the e-mail or 
attachments are free from computer virus or other defect.


RE: Spark Streaming - use the data in different jobs

2015-10-19 Thread Ewan Leith
Storing the data in HBase, Cassandra, or similar is possibly the right answer, 
the other option that can work well is re-publishing the data back into second 
queue on RabbitMQ, to be read again by the next job.

Thanks,
Ewan

From: Oded Maimon [mailto:o...@scene53.com]
Sent: 18 October 2015 12:49
To: user 
Subject: Spark Streaming - use the data in different jobs

Hi,
we've build a spark streaming process that get data from a pub/sub (rabbitmq in 
our case).

now we want the streamed data to be used in different spark jobs (also in 
realtime if possible)

what options do we have for doing that ?


  *   can the streaming process and different spark jobs share/access the same 
RDD's?
  *   can the streaming process create a sparkSQL table and other jobs read/use 
it?
  *   can a spark streaming process trigger other spark jobs and send the the 
data (in memory)?
  *   can a spark streaming process cache the data in memory and other 
scheduled jobs access same rdd's?
  *   should we keep the data to hbase and read it from other jobs?
  *   other ways?

I believe that the answer will be using external db/storage..  hoping to have a 
different solution :)

Thanks.


Regards,
Oded Maimon
Scene53.


This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. 
Please note that any disclosure, copying or distribution of the content of this 
information is strictly forbidden. If you have received this email message in 
error, please destroy it immediately and notify its sender.


RE: Should I convert json into parquet?

2015-10-19 Thread Ewan Leith
As Jörn says, Parquet and ORC will get you really good compression and can be 
much faster. There also some nice additions around predicate pushdown which can 
be great if you've got wide tables.

Parquet is obviously easier to use, since it's bundled into Spark. Using ORC is 
described here 
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/

Thanks,
Ewan

-Original Message-
From: Jörn Franke [mailto:jornfra...@gmail.com] 
Sent: 19 October 2015 06:32
To: Gavin Yue 
Cc: user 
Subject: Re: Should I convert json into parquet?



Good Formats are Parquet or ORC. Both can be useful with compression, such as 
Snappy.   They are much faster than JSON. however, the table structure is up to 
you and depends on your use case.

> On 17 Oct 2015, at 23:07, Gavin Yue  wrote:
> 
> I have json files which contains timestamped events.  Each event associate 
> with a user id. 
> 
> Now I want to group by user id. So converts from
> 
> Event1 -> UserIDA;
> Event2 -> UserIDA;
> Event3 -> UserIDB;
> 
> To intermediate storage. 
> UserIDA -> (Event1, Event2...)
> UserIDB-> (Event3...)
> 
> Then I will label positives and featurize the Events Vector in many different 
> ways, fit each of them into the Logistic Regression. 
> 
> I want to save intermediate storage permanently since it will be used many 
> times.  And there will new events coming every day. So I need to update this 
> intermediate storage every day. 
> 
> Right now I store intermediate storage using Json files.  Should I use 
> Parquet instead?  Or is there better solutions for this use case?
> 
> Thanks a lot !
> 
> 
> 
> 
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread Ewan Leith
Try reducing the number of workers to 2, and increasing their memory up to 6GB.

However I've seen mention of a bug in the pyspark API for when calling head() 
on a dataframe in spark 1.5.0 and 1.4, it's got a big performance hit.

https://issues.apache.org/jira/browse/SPARK-10731

It's fixed in spark 1.5.1 which was released yesterday, so maybe try upgrading.

Ewan


-Original Message-
From: camelia [mailto:came...@chalmers.se] 
Sent: 30 September 2015 10:51
To: user@spark.apache.org
Subject: Need for advice - performance improvement and out of memory resolution

Hello,

I am working on a machine learning project, currently using
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on 
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention 
working in Python from an IPython notebook.
 

I face the following problem: when working with a Dataframe created from a CSV 
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark 
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are 
recorder for computing one feature's statistics, for example:


START AT: 2015-09-21
08:56:41.136947
 
+---+--+
|summary|  VAR_1933|
+---+--+
|  count|145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max|  |
+---+--+

 
FINISH AT: 2015-09-21
09:02:49.452260



So, my first question would be what configuration parameters to set in order to 
improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying 
resources explicitly when creating the Spark configuration resulted in worse 
performance; I mean :

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path)

worked twice faster than:

config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory
", "3g")





Secondly, when I do the one hot encoding (I tried two different ways of keeping 
results) I don't arrive at showing the head(1) of the resulted dataframe. We 
have the function :

def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat,
outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ) for item in 
list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1) ...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was 
supposed to print the head(1), the IPython notebook  remained in the state 
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that 
here I eliminated the intermediary prints of the head(1)), but it gave an "out 
of memory" error at the only (final result) head(1),  that I paste below :

===

 

df_result_12.head(1)

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652 
653 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308 
309 @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port,
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, 

RE: Converting a DStream to schemaRDD

2015-09-29 Thread Ewan Leith
Something like:

dstream.foreachRDD { rdd =>
  val df =  sqlContext.read.json(rdd)
  df.select(…)
}

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


Might be the place to start, it’ll convert each batch of dstream into an RDD 
then let you work it as if it were a standard RDD dataset.

Ewan


From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: 29 September 2015 15:03
To: user 
Subject: Converting a DStream to schemaRDD

Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel



SQLContext.read().json() inferred schema - force type to strings?

2015-09-25 Thread Ewan Leith
Hi all,

We're uising SQLContext.read.json to read in a stream of JSON datasets, but 
sometimes the inferred schema contains for the same value a LongType, and 
sometimes a DoubleType.

This obviously causes problems with merging the schema, so does anyone know a 
way of forcing the inferred schema to always mark every type as a StringType, 
then we can handle the type checking ourselves?

I know we could use a specified schema, but that loses some of the flexibility 
we're getting at the moment.

Thanks,
Ewan


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-19 Thread Ewan Leith
yarn-client still runs the executor tasks on the cluster, the main difference 
is where the driver job runs.


Thanks,

Ewan


-- Original message--

From: shahab

Date: Fri, 18 Sep 2015 13:11

To: Aniket Bhatnagar;

Cc: user@spark.apache.org;

Subject:Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected 
yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not 
supported directly by SparkContext. Please use spark-submit.


It works using yarn-client but I want to make it running on cluster. Is there 
any way to do so?

best,
/Shahab

On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar 
> wrote:

Can you try yarn-client mode?

On Fri, Sep 18, 2015, 3:38 PM shahab 
> wrote:
Hi,

Probably I have wrong zeppelin  configuration, because I get the following 
error when I execute spark statements in Zeppelin:

org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running 
on a cluster. Deployment to YARN is not supported directly by SparkContext. 
Please use spark-submit.


Anyone knows What's the solution to this?

best,
/Shahab



Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-10 Thread Ewan Leith
The last time I checked, if you launch EMR 4 with only Spark selected as an 
application, HDFS isn't correctly installed.


Did you select another application like Hive at launch time as well as Spark? 
If not, try that.


Thanks,

Ewan


-- Original message--

From: Dean Wampler

Date: Wed, 9 Sep 2015 22:29

To: shahab;

Cc: user@spark.apache.org;

Subject:Re: [Spark on Amazon EMR] : File does not exist: 
hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar


If you log into the cluster, do you see the file if you type:

hdfs dfs -ls 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

(with the correct server address for "ipx-x-x-x"). If not, is the server 
address correct and routable inside the cluster. Recall that EC2 instances have 
both public and private host names & IP addresses.

Also, is the port number correct for HDFS in the cluster?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd 
Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Wed, Sep 9, 2015 at 9:28 AM, shahab 
> wrote:
Hi,
I am using Spark on Amazon EMR. So far I have not succeeded to submit the 
application successfully, not sure what's problem. In the log file I see the 
followings.
java.io.FileNotFoundException: File does not exist: 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the fat 
jar file didn't solve the problem. I am out of clue now.
I want to submit a spark application, using aws web console, as a step. I 
submit the application as : spark-submit --deploy-mode cluster --class 
mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is there 
any one who has similar problem with EMR?

best,
/Shahab



RE: NOT IN in Spark SQL

2015-09-04 Thread Ewan Leith
Spark SQL doesn’t support “NOT IN”, but I think HiveQL does, so give using the 
HiveContext a try rather than SQLContext. Here’s the spark 1.2 docs on it, but 
it’s basically identical to running the SQLContext

https://spark.apache.org/docs/1.2.0/sql-programming-guide.html#tab_scala_6
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/sql/hive/HiveContext.html

Thanks,
Ewan

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: 04 September 2015 13:12
To: Pietro Gentile 
Cc: user@spark.apache.org
Subject: Re: NOT IN in Spark SQL

I think spark doesn't support NOT IN clauses, but you can do the same with a 
LEFT OUTER JOIN, Something like:

SELECT A.id FROM A LEFT OUTER JOIN B ON (B.id = A.id) WHERE B.id IS null

Thanks
Best Regards

On Thu, Sep 3, 2015 at 8:46 PM, Pietro Gentile 
>
 wrote:
Hi all,

How can I do to use the "NOT IN" clause in Spark SQL 1.2 ??

He continues to give me syntax errors. But the question is correct in SQL.

Thanks in advance,
Best regards,

Pietro.



spark-csv package - output to filename.csv?

2015-09-03 Thread Ewan Leith
Using the spark-csv package or outputting to text files, you end up with files 
named:

test.csv/part-00

rather than a more user-friendly "test.csv", even if there's only 1 part file.

We can merge the files using the Hadoop merge command with something like this 
code from http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/


def merge(sc: SparkContext, srcPath: String, dstPath: String): Unit = {

val srcFileSystem = FileSystem.get(new URI(srcPath), sc.hadoopConfiguration)

val dstFileSystem = FileSystem.get(new URI(dstPath), sc.hadoopConfiguration)

dstFileSystem.delete(new Path(dstPath), true)

FileUtil.copyMerge(srcFileSystem, new Path(srcPath), dstFileSystem, new 
Path(dstPath), true, sc.hadoopConfiguration, null)

  }

but does anyone know a way without dropping down to Hadoop.fs code?

Thanks,
Ewan


RE: How to Take the whole file as a partition

2015-09-03 Thread Ewan Leith
Have a look at the sparkContext.binaryFiles, it works like wholeTextFiles but 
returns a PortableDataStream per file. It might be a workable solution though 
you'll need to handle the binary to UTF-8 or equivalent conversion

Thanks,
Ewan

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: 03 September 2015 15:22
To: user@spark.apache.org
Subject: How to Take the whole file as a partition

Hi All,

I have 1000 files, from 500M to 1-2GB at this moment. And I want my spark can 
read them as partition on the file level. Which means want the FileSplit turn 
off.

I know there are some solutions, but not very good in my case:
1, I can't use WholeTextFiles method, because my file is too big, I don't want 
to risk the performance.
2, I try to use newAPIHadoopFile and turnoff the file split:

lines = 
ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, 
LongWritable.class, Text.class, hadoopConf).values()

.map(new Function() {

@Override

public String call(Text arg0) throws Exception {

return arg0.toString();

}

});

This works for some cases, but it truncate some lines (I am not sure why, but 
it looks like there is a limit on this file reading). I have a feeling that the 
spark truncate this file on 2GB bytes. Anyway it happens (because same data has 
no issue when I use mapreduce to do the input), the spark sometimes do a trunc 
on very big file if try to read all of them.

3, I can do another way is distribute the file name as the input of the Spark 
and in function open stream to read the file directly. This is what I am 
planning to do but I think it is ugly. I want to know anyone have better 
solution for it?

BTW: the file currently in text format, but it might be parquet format later, 
that is also reason I don't like my third option.

Regards,

Shuai


Re: Problem while loading saved data

2015-09-03 Thread Ewan Leith
>From that, I'd guesd that HDFS isn't setup between the nodes, or for some 
>reason writes are defaulting to file:///path/ rather than hdfs:///path/




-- Original message--

From: Amila De Silva

Date: Thu, 3 Sep 2015 17:12

To: Ewan Leith;

Cc: user@spark.apache.org;

Subject:Re: Problem while loading saved data


Hi Ewan,

Yes, 'people.parquet' is from the first attempt and in that attempt it tried to 
save the same people.json.

It seems that the same folder is created on both the nodes and contents of the 
files are distributed between the two servers.

On the master node(this is the same node which runs IPython Notebook) this is 
what I have:

people.parquet
└── _SUCCESS

On the slave I get,
people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary

I have zipped and attached both the folders.

On Thu, Sep 3, 2015 at 5:58 PM, Ewan Leith 
<ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> wrote:
Your error log shows you attempting to read from 'people.parquet2' not 
‘people.parquet’ as you’ve put below, is that just from a different attempt?

Otherwise, it’s an odd one! There aren’t _SUCCESS, _common_metadata and 
_metadata files under people.parquet that you’ve listed below, which would 
normally be created when the write completes, can you show us your write output?


Thanks,
Ewan



From: Amila De Silva [mailto:jaa...@gmail.com<mailto:jaa...@gmail.com>]
Sent: 03 September 2015 05:44
To: Guru Medasani <gdm...@gmail.com<mailto:gdm...@gmail.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Problem while loading saved data

Hi Guru,

Thanks for the reply.

Yes, I checked if the file exists. But instead of a single file what I found 
was a directory having the following structure.

people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary


On Thu, Sep 3, 2015 at 7:13 AM, Guru Medasani 
<gdm...@gmail.com<mailto:gdm...@gmail.com>> wrote:
Hi Amila,

Error says that the ‘people.parquet’ file does not exist. Can you manually 
check to see if that file exists?


Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.



Guru Medasani
gdm...@gmail.com<mailto:gdm...@gmail.com>



On Sep 2, 2015, at 8:25 PM, Amila De Silva 
<jaa...@gmail.com<mailto:jaa...@gmail.com>> wrote:

Hi All,

I have a two node spark cluster, to which I'm connecting using IPython notebook.
To see how data saving/loading works, I simply created a dataframe using 
people.json using the Code below;

df = sqlContext.read.json("examples/src/main/resources/people.json")

Then called the following to save the dataframe as a parquet.
df.write.save("people.parquet")

Tried loading the saved dataframe using;
df2 = sqlContext.read.parquet('people.parquet');

But this simply fails giving the following exception


---

Py4JJavaError Traceback (most recent call last)

 in ()

> 1 df2 = sqlContext.read.parquet('people.parquet2');



/srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)

154 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 
'int')]

155 """

--> 156 return 
self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))

157

158 @since(1.4)



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

536 answer = self.gateway_client.send_command(command)

537 return_value = get_return_value(answer, self.gateway_client,

--> 538 self.target_id, self.name<http://self.name/>)

539

540 for temp_arg in temp_args:



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

298 raise Py4JJavaError(

299 'An error occurred while calling {0}{1}{2}.\n'.

--> 300 format(target_id, '.', name), value)

301 else:

302 raise Py4JError(



Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data

RE: Problem while loading saved data

2015-09-03 Thread Ewan Leith
Your error log shows you attempting to read from 'people.parquet2' not 
‘people.parquet’ as you’ve put below, is that just from a different attempt?

Otherwise, it’s an odd one! There aren’t _SUCCESS, _common_metadata and 
_metadata files under people.parquet that you’ve listed below, which would 
normally be created when the write completes, can you show us your write output?


Thanks,
Ewan



From: Amila De Silva [mailto:jaa...@gmail.com]
Sent: 03 September 2015 05:44
To: Guru Medasani 
Cc: user@spark.apache.org
Subject: Re: Problem while loading saved data

Hi Guru,

Thanks for the reply.

Yes, I checked if the file exists. But instead of a single file what I found 
was a directory having the following structure.

people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary


On Thu, Sep 3, 2015 at 7:13 AM, Guru Medasani 
> wrote:
Hi Amila,

Error says that the ‘people.parquet’ file does not exist. Can you manually 
check to see if that file exists?


Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.



Guru Medasani
gdm...@gmail.com



On Sep 2, 2015, at 8:25 PM, Amila De Silva 
> wrote:

Hi All,

I have a two node spark cluster, to which I'm connecting using IPython notebook.
To see how data saving/loading works, I simply created a dataframe using 
people.json using the Code below;

df = sqlContext.read.json("examples/src/main/resources/people.json")

Then called the following to save the dataframe as a parquet.
df.write.save("people.parquet")

Tried loading the saved dataframe using;
df2 = sqlContext.read.parquet('people.parquet');

But this simply fails giving the following exception


---

Py4JJavaError Traceback (most recent call last)

 in ()

> 1 df2 = sqlContext.read.parquet('people.parquet2');



/srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)

154 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 
'int')]

155 """

--> 156 return 
self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))

157

158 @since(1.4)



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

536 answer = self.gateway_client.send_command(command)

537 return_value = get_return_value(answer, self.gateway_client,

--> 538 self.target_id, self.name)

539

540 for temp_arg in temp_args:



/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

298 raise Py4JJavaError(

299 'An error occurred while calling {0}{1}{2}.\n'.

--> 300 format(target_id, '.', name), value)

301 else:

302 raise Py4JError(



Py4JJavaError: An error occurred while calling o53840.parquet.

: java.lang.AssertionError: assertion failed: No schema defined, and no Parquet 
data file or summary file found under file:/home/ubuntu/ipython/people.parquet2.

   at scala.Predef$.assert(Predef.scala:179)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:429)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)

   at scala.Option.orElse(Option.scala:257)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)

   at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:126)

   at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:124)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)

   at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)

   at scala.Option.getOrElse(Option.scala:120)

   at 

RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0

2015-09-03 Thread Ewan Leith
For those who have similar issues on EMR writing Parquet files, if you update 
mapred-site.xml with the following lines:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
parquet.enable.summary-metadatafalse
spark.sql.parquet.output.committer.classorg.apache.spark.sql.parquet.DirectParquetOutputCommitter
 

Then you get Parquet files writing direct to S3 without use of temporary files 
too, and the disabled summary-metadata files which can cause a performance hit 
with writing large Parquet datasets on S3

The easiest way to add them across the cluster is via the –configurations flag 
on the “aws emr create-cluster” command

Thanks,
Ewan


From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: 03 September 2015 00:12
To: Neil Jonkers 
Cc: user@spark.apache.org
Subject: Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.  
saveAsTextFile is fast now.
2015-09-02 23:07:00,022 INFO  [task-result-getter-0] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 
ms on ip-10-0-24-103.ec2.internal (1/24)
2015-09-02 23:07:01,887 INFO  [task-result-getter-2] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms 
on ip-10-0-26-14.ec2.internal (24/24)
2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) 
finished in 6.319 s
2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore 
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar 
tmp/test40_141_24_406/_SUCCESS 0

Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers 
> wrote:
Hi,
Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
You can also config this at cluster launch time with the following 
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
> wrote:
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting
 
mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
 


On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
> wrote:
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class  
com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
> wrote:
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to 
emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I see 
lots of INFO messages with 404 error from com.amazonaws.latency logger for next 
90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + 
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) 
finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler 
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all 
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, 
took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:listStatus(896)) - listStatus 
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], 
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found 
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 
3B2F06FD11682D22), S3 Extended Request ID: 
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], 
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], 
AWSRequestID=[3B2F06FD11682D22], 
ServiceEndpoint=[https://foo-bar.s3.amazonaws.com], Exception=1, 
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, 
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], 
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], 
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], 
ServiceName=[Amazon S3], 

RE: How to increase the Json parsing speed

2015-08-28 Thread Ewan Leith
Can you post roughly what you’re running as your Spark code? One issue I’ve 
seen before is that passing a directory full of files as a path 
“/path/to/files/” can be slow, while “/path/to/files/*” runs fast.

Also, if you’ve not seen it, have a look at the binaryFiles call

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

which can be a really handy way of manipulating files without reading them all 
into memory first – it returns a PortableDataStream which you can then handle 
in your java InputStreamReader code

Ewan




From: Gavin Yue [mailto:yue.yuany...@gmail.com]
Sent: 28 August 2015 08:06
To: Sabarish Sasidharan sabarish.sasidha...@manthan.com
Cc: user user@spark.apache.org
Subject: Re: How to increase the Json parsing speed

500 each with 8GB memory.
I did the test again on the cluster.
I have 6000 files which generates 6000 tasks.  Each task takes 1.5 min to 
finish based on the Stats.
So theoretically it should take 15 mins roughly. WIth some additinal overhead, 
it totally takes 18 mins.

Based on the local file parsing test, seems simply parsing the json is fast, 
which only takes 7 secs.

So wonder where is the additional 1 min coming from.
Thanks again.


On Thu, Aug 27, 2015 at 11:44 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.commailto:sabarish.sasidha...@manthan.com wrote:
How many executors are you using when using Spark SQL?

On Fri, Aug 28, 2015 at 12:12 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.commailto:sabarish.sasidha...@manthan.com wrote:
I see that you are not reusing the same mapper instance in the Scala snippet.

Regards
Sab

On Fri, Aug 28, 2015 at 9:38 AM, Gavin Yue 
yue.yuany...@gmail.commailto:yue.yuany...@gmail.com wrote:
Just did some tests.
I have 6000 files, each has 14K records with 900Mb file size.  In spark sql, it 
would take one task roughly 1 min to parse.
On the local machine, using the same Jackson lib inside Spark lib. Just parse 
it.

FileInputStream fstream = new FileInputStream(testfile);
BufferedReader br = new BufferedReader(new 
InputStreamReader(fstream));
String strLine;
Long begin = System.currentTimeMillis();
 while ((strLine = br.readLine()) != null)   {
JsonNode s = mapper.readTree(strLine);
 }
System.out.println(System.currentTimeMillis() - begin);
In JDK8, it took 6270ms.
Same code in Scala, it would take 7486ms
   val begin =  java.lang.System.currentTimeMillis()
for(line - Source.fromFile(testfile).getLines())
{
  val mapper = new ObjectMapper()
  mapper.registerModule(DefaultScalaModule)
  val s = mapper.readTree(line)
}
println(java.lang.System.currentTimeMillis() - begin)

One Json record contains two fileds :  ID and List[Event].
I am guessing put all the events into List would take the left time.
Any solution to speed this up?
Thanks a lot!


On Thu, Aug 27, 2015 at 7:45 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.commailto:sabarish.sasidha...@manthan.com wrote:

For your jsons, can you tell us what is your benchmark when running on a single 
machine using just plain Java (without Spark and Spark sql)?

Regards
Sab
On 28-Aug-2015 7:29 am, Gavin Yue 
yue.yuany...@gmail.commailto:yue.yuany...@gmail.com wrote:
Hey

I am using the Json4s-Jackson parser coming with spark and parsing roughly 80m 
records with totally size 900mb.

But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem) roughly 
30mins to parse Json to use spark sql.

Jackson has the benchmark saying parsing should be ms level.

Any way to increase speed?

I am using spark 1.4 on Hadoop 2.7 with Java 8.

Thanks a lot !
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




--

Architect - Big Data
Ph: +91 99805 99458tel:%2B91%2099805%2099458

Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
India ICT)
+++



--

Architect - Big Data
Ph: +91 99805 99458tel:%2B91%2099805%2099458

Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
India ICT)
+++



RE: correct use of DStream foreachRDD

2015-08-28 Thread Ewan Leith
I think what you’ll want is to carry out the .map functions before the 
foreachRDD, something like:

val lines = 
ssc.textFileStream(/stream).map(Sensor.parseSensor).map(Sensor.convertToPut)

lines.foreachRDD { rdd =
  // parse the line of data into sensor object
 rdd.saveAsHadoopDataset(jobConfig)

}

Will perform the bulk of the work in the distributed processing, before the 
results are returned to the driver for writing to HBase.

Thanks,
Ewan

From: Carol McDonald [mailto:cmcdon...@maprtech.com]
Sent: 28 August 2015 15:30
To: user user@spark.apache.org
Subject: correct use of DStream foreachRDD

I would like to make sure  that I am using the DStream  foreachRDD operation 
correctly. I would like to read from a DStream transform the input and write to 
HBase.  The code below works , but I became confused when I read Note that the 
function func is executed in the driver process ?


val lines = ssc.textFileStream(/stream)

lines.foreachRDD { rdd =
  // parse the line of data into sensor object
  val sensorRDD = rdd.map(Sensor.parseSensor)

  // convert sensor data to put object and write to HBase table column 
family data
  new PairRDDFunctions(sensorRDD.
  map(Sensor.convertToPut)).
  saveAsHadoopDataset(jobConfig)

}


RE: Driver running out of memory - caused by many tasks?

2015-08-27 Thread Ewan Leith
Are you using the Kryo serializer? If not, have a look at it, it can save a lot 
of memory during shuffles

https://spark.apache.org/docs/latest/tuning.html

I did a similar task and had various issues with the volume of data being 
parsed in one go, but that helped a lot. It looks like the main difference from 
what you're doing to me is that my input classes were just a string and a byte 
array, which I then processed once it was read into the RDD, maybe your classes 
are memory heavy?


Thanks,
Ewan

-Original Message-
From: andrew.row...@thomsonreuters.com 
[mailto:andrew.row...@thomsonreuters.com] 
Sent: 27 August 2015 11:53
To: user@spark.apache.org
Subject: Driver running out of memory - caused by many tasks?

I have a spark v.1.4.1 on YARN job where the first stage has ~149,000 tasks 
(it’s reading a few TB of data). The job itself is fairly simple - it’s just 
getting a list of distinct values:

val days = spark
  .sequenceFile(inputDir, classOf[KeyClass], classOf[ValueClass])
  .sample(withReplacement = false, fraction = 0.01)
  .map(row = row._1.getTimestamp.toString(-MM-dd))
  .distinct()
  .collect()

The cardinality of the ‘day’ is quite small - there’s only a handful. However, 
I’m frequently running into OutOfMemory issues on the driver. I’ve had it fail 
with 24GB RAM, and am currently nudging it upwards to find out where it works. 
The ratio between input and shuffle write in the distinct stage is about 
3TB:7MB. On a smaller dataset, it works without issue on a smaller (4GB) heap. 
In YARN cluster mode, I get a failure message similar to:

Container 
[pid=36844,containerID=container_e15_1438040390147_4982_01_01] is running 
beyond physical memory limits. Current usage: 27.6 GB of 27 GB physical memory 
used; 29.5 GB of 56.7 GB virtual memory used. Killing container.


Is the driver running out of memory simply due to the number of tasks, or is 
there something about the job program that’s causing it to put a lot of data 
into the driver heap and go oom? If the former, is there any general guidance 
about the amount of memory to give to the driver as a function of how many 
tasks there are?

Andrew


Selecting different levels of nested data records during one select?

2015-08-27 Thread Ewan Leith
Hello,

I'm trying to query a nested data record of the form:

root
|-- userid: string (nullable = true)
|-- datarecords: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- name: string (nullable = true)
|||-- system: boolean (nullable = true)
|||-- time: string (nullable = true)
|||-- title: string (nullable = true)

Where for each userid record, there are many datarecords elements.

I'd like to be able to run the SQL equivalent of:

select userid, name, system, time, title

and get 1 output row per nested row, each one containing the matching userid 
for that row (if that makes sense!).

the explode function seemed like the place to start, but it seems I have to 
call it individually for each nested column, then I end up with a huge number 
of results based on a Cartesian join?

Is anyone able to point me in the right direction?

Thanks,
Ewan




RE: Create column in nested structure?

2015-08-13 Thread Ewan Leith
Never mind me, I've found an email to this list from Raghavendra Pandey which 
got me what I needed

val nestedCol = struct(df(nested2.column1), df(nested2.column2), 
df(flatcolumn))
val df2 = df.select(df(nested1), nestedCol as nested2)

Thanks,
Ewan

From: Ewan Leith
Sent: 13 August 2015 15:44
To: user@spark.apache.org
Subject: Create column in nested structure?

Has anyone used withColumn (or another method) to add a column to an existing 
nested dataframe?

If I call:

df.withColumn(nested.newcolumn, df(oldcolumn))

then it just creates the new column with a . In it's name, not under the 
nested structure.

Thanks,
Ewan


Create column in nested structure?

2015-08-13 Thread Ewan Leith
Has anyone used withColumn (or another method) to add a column to an existing 
nested dataframe?

If I call:

df.withColumn(nested.newcolumn, df(oldcolumn))

then it just creates the new column with a . In it's name, not under the 
nested structure.

Thanks,
Ewan


Parquet file organisation for 100GB+ dataframes

2015-08-12 Thread Ewan Leith
Hi all,

Can anyone share their experiences working with storing and organising larger 
datasets with Spark?

I've got a dataframe stored in Parquet on Amazon S3 (using EMRFS) which has a 
fairly complex nested schema (based on JSON files), which I can query in Spark, 
but the initial setup takes a few minutes, as we've got roughly 5000 partitions 
and 150GB of compressed parquet part files.

Generally things work, but we seem to be hitting various limitations now we're 
working with 100+GB of data, such as the 2GB block size limit in Spark which 
means we need a large number of partitions, slow startup due to partition 
discovery, etc.

Storing data in one big dataframe has worked well so far, but do we need to 
look at breaking it out into multiple dataframes?

Has anyone got any advice on how to structure this?

Thanks,
Ewan



RE: Specifying the role when launching an AWS spark cluster using spark_ec2

2015-08-07 Thread Ewan Leith
You'll have a lot less hassle using the AWS EMR instances with Spark 1.4.1 for 
now, until the spark_ec2.py scripts move to Hadoop 2.7.1, at the moment I'm 
pretty sure it's only using Hadoop 2.4

The EMR setup with Spark lets you use s3:// URIs with IAM roles

Ewan

-Original Message-
From: SK [mailto:skrishna...@gmail.com] 
Sent: 06 August 2015 18:27
To: user@spark.apache.org
Subject: Specifying the role when launching an AWS spark cluster using spark_ec2

Hi,

I need to access data on S3 from another account and I have been given the IAM 
role information to access that S3 bucket. From what I understand, AWS allows 
us to attach a role to a resource at the time it is created. However, I don't 
see an option for specifying the role using the spark_ec2.py script. 
So I created a spark cluster using the default role, but I was not able to 
change its IAM role after creation through AWS console.

I see a ticket for this issue:
https://github.com/apache/spark/pull/6962 and the status is closed. 

If anyone knows how I can specify the role using spark_ec2.py, please let me 
know. I am using spark 1.4.1.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Help accessing protected S3

2015-07-23 Thread Ewan Leith
I think the standard S3 driver used in Spark from the Hadoop project (S3n) 
doesn't support IAM role based authentication.

However, S3a should support it. If you're running Hadoop 2.6 via the spark-ec2 
scripts (I'm not sure what it launches with by default) try accessing your 
bucket via s3a:// URLs instead of s3n://

http://wiki.apache.org/hadoop/AmazonS3

https://issues.apache.org/jira/browse/HADOOP-10400

Thanks,
Ewan



-Original Message-
From: Greg Anderson [mailto:gregory.ander...@familysearch.org] 
Sent: 22 July 2015 18:00
To: user@spark.apache.org
Subject: Help accessing protected S3

I have a protected s3 bucket that requires a certain IAM role to access.  When 
I start my cluster using the spark-ec2 script, everything works just fine until 
I try to read from that part of s3.  Here is the command I am using:

./spark-ec2 -k KEY -i KEY_FILE.pem --additional-security-group=IAM_ROLE 
--copy-aws-credentials --zone=us-east-1e -t m1.large --worker-instances=3 
--hadoop-major-version=2.7.1 --user-data=test.sh launch my-cluster

I have read through this article: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-Bucket-Access-td16303.html

The problem seems to be very similar, but I wasn't able to find a solution in 
it for me.  I'm not sure what else to provide here, just let me know what you 
need.  Thanks in advance!
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: coalesce on dataFrame

2015-07-01 Thread Ewan Leith
It's in spark 1.4.0, or should be at least:

https://issues.apache.org/jira/browse/SPARK-6972

Ewan

-Original Message-
From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] 
Sent: 01 July 2015 08:23
To: user@spark.apache.org
Subject: coalesce on dataFrame

How can we use coalesce(1, true) on dataFrame?


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Ewan Leith
Try putting a * on the end of xmlDir, i.e.

xmlDir = fdfs:///abc/def/*

Rather than

xmlDir = Hdfs://abc/def

and see what happens. I don't know why, but that appears to be more reliable 
for me with S3 as the filesystem.

I'm also using binaryFiles, but I've tried running the same command while 
wholeTextFiles and had the same error.

Ewan

-Original Message-
From: Kostas Kougios [mailto:kostas.koug...@googlemail.com] 
Sent: 08 June 2015 15:02
To: user@spark.apache.org
Subject: spark timesout maybe due to binaryFiles() with more than 1 million 
files in HDFS

I am reading millions of xml files via

val xmls = sc.binaryFiles(xmlDir)

The operation runs fine locally but on yarn it fails with:

 client token: N/A
 diagnostics: Application application_1433491939773_0012 failed 2 times due to 
ApplicationMaster for attempt appattempt_1433491939773_0012_02 timed out. 
Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1433750951883
 final status: FAILED
 tracking URL:
http://controller01:8088/cluster/app/application_1433491939773_0012
 user: ariskk
Exception in thread main org.apache.spark.SparkException: Application 
finished with failed status at 
org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

On hadoops/userlogs logs I am frequently getting these messages:

15/06/08 09:15:38 WARN util.AkkaUtils: Error sending message [message = 
Heartbeat(1,[Lscala.Tuple2;@2b4f336b,BlockManagerId(1,
controller01.stratified, 58510))] in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)

I run my spark job via spark-submit and it works for an other HDFS directory 
that contains only 37k files. Any ideas how to resolve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-timesout-maybe-due-to-binaryFiles-with-more-than-1-million-files-in-HDFS-tp23208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Ewan Leith
Can you do a simple

sc.binaryFiles(hdfs:///path/to/files/*).count()

in the spark-shell and verify that part works?

Ewan



-Original Message-
From: Konstantinos Kougios [mailto:kostas.koug...@googlemail.com] 
Sent: 08 June 2015 15:40
To: Ewan Leith; user@spark.apache.org
Subject: Re: spark timesout maybe due to binaryFiles() with more than 1 million 
files in HDFS

No luck I am afraid. After giving the namenode 16GB of RAM, I am still getting 
an out of mem exception, kind of different one:

15/06/08 15:35:52 ERROR yarn.ApplicationMaster: User class threw
exception: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1351)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
 at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)
 at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy10.getListing(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
 at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
 at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
 at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
 at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
 at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
 at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
 at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
 at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
 at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
 at
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47)
 at
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)


and on the 2nd retry of spark, a similar exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
com.google.protobuf.LiteralByteString.toString(LiteralByteString.java:148)
 at com.google.protobuf.ByteString.toStringUtf8(ByteString.java:572)
 at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.getOwner(HdfsProtos.java:21558)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
 at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
 at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)
 at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy10.getListing(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
 at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
 at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755

RE: redshift spark

2015-06-05 Thread Ewan Leith
That project is for reading data in from Redshift table exports stored in s3 by 
running commands in redshift like this:

unload ('select * from venue')   
to 's3://mybucket/tickit/unload/'

http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html

The path in the parameters below is the s3 bucket path.

Hope this helps,
Ewan

-Original Message-
From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] 
Sent: 05 June 2015 15:25
To: user@spark.apache.org
Subject: redshift spark

Hi All,

I want to read and write data to aws redshift. I found spark-redshift project 
at following address.
https://github.com/databricks/spark-redshift

in its documentation there is following code is written. 
import com.databricks.spark.redshift.RedshiftInputFormat

val records = sc.newAPIHadoopFile(
  path,
  classOf[RedshiftInputFormat],
  classOf[java.lang.Long],
  classOf[Array[String]])

I am unable to understand it's parameters. Can somebody explain how to use 
this? what is meant by path in this case?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/redshift-spark-tp23175.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan


RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that's brilliant, you've saved me a headache.

Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:58
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

That's right. Also, Spark SQL can automatically infer schema from JSON 
datasets. You don't need to specify an Avro schema:

   sqlContext.jsonFile(json/path).saveAsParquetFile(parquet/path)

or with the new reader/writer API introduced in 1.4-SNAPSHOT:

   sqlContext.read.json(json/path).write.parquet(parquet/path)

Cheng
On 5/19/15 6:07 PM, Ewan Leith wrote:
Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON or CSV 
inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark 
SQL StructType when building new Dataframes? If so, that will be a lot simpler!

Thanks,
Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:01
To: Ewan Leith; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet files, we 
internally converts Avro schema to Spark SQL StructType first, and then convert 
StructType to Parquet schema.

Cheng
On 5/19/15 4:42 PM, Ewan Leith wrote:
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan




RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON or CSV 
inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark 
SQL StructType when building new Dataframes? If so, that will be a lot simpler!

Thanks,
Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:01
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet files, we 
internally converts Avro schema to Spark SQL StructType first, and then convert 
StructType to Parquet schema.

Cheng
On 5/19/15 4:42 PM, Ewan Leith wrote:
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan