Re: share datasets across multiple spark-streaming applications for lookup

2017-11-02 Thread JG Perrin
Or Databaricks Delta (announced at Spark Summit) or IBM Event Store depending 
on the use case.

On Oct 31, 2017, at 14:30, Joseph Pride 
> wrote:

Folks:

SnappyData.

I’m fairly new to working with it myself, but it looks pretty promising. It 
marries Spark with a co-located in-memory GemFire (or something gem-related) 
database. So you can access the data with SQL, JDBC, ODBC (if you wanna go 
Enterprise instead of open-source) or natively as mutable RDDs and DataFrames.

You can run it so the storage and Spark compute are co-located in the same JVM 
on each machine, so you get data locality instead of a bottleneck between load, 
save, and compute. The data is supposed to persist between applications, 
cluster startups, or multiple applications doing stuff to the data at the same 
time.

I hope it works for what I’m doing and isn’t too buggy. But it looks pretty 
good.

—Joe Pride

On Oct 31, 2017, at 11:14 AM, Gene Pang 
> wrote:

Hi,

Alluxio enables sharing dataframes across different applications. This blog 
post 
talks about dataframes and Alluxio, and this Spark Summit 
presentation
 has additional information.

Thanks,
Gene

On Tue, Oct 31, 2017 at 6:04 PM, Revin Chalil 
> wrote:
Any info on the below will be really appreciated.

I read about Alluxio and Ignite. Has anybody used any of them? Do they work 
well with multiple Apps doing lookups simultaneously? Are there better options? 
Thank you.

From: roshan joe >
Date: Monday, October 30, 2017 at 7:53 PM
To: "user@spark.apache.org" 
>
Subject: share datasets across multiple spark-streaming applications for lookup

Hi,

What is the recommended way to share datasets across multiple spark-streaming 
applications, so that the incoming data can be looked up against this shared 
dataset?

The shared dataset is also incrementally refreshed and stored on S3. Below is 
the scenario.

Streaming App-1 consumes data from Source-1 and writes to DS-1 in S3.
Streaming App-2 consumes data from Source-2 and writes to DS-2 in S3.


Streaming App-3 consumes data from Source-3, needs to lookup against DS-1 and 
DS-2 and write to DS-3 in S3.
Streaming App-4 consumes data from Source-4, needs to lookup against DS-1 and 
DS-2 and write to DS-3 in S3.
Streaming App-n consumes data from Source-n, needs to lookup against DS-1 and 
DS-2 and write to DS-n in S3.

So DS-1 and DS-2 ideally should be shared for lookup across multiple streaming 
apps. Any input is appreciated. Thank you!




Spark as ETL, was: Re: Dose pyspark supports python3.6?

2017-11-02 Thread JG Perrin
Pros:
No need for Scala skills, Java can be used.
Other companies are already doing it.
> Support Yarn execution
But not only…
Complex use-case for import can easily be done in Java (see 
https://spark-summit.org/eu-2017/events/extending-apache-sparks-ingestion-building-your-own-java-data-source/
 - sorry shameless self promo).
Can be parallelized on all nodes of the cluster

Cons:
> No ETL gui.
But works very well with libraries like log4j so we can track the process

His

jg

On Nov 1, 2017, at 22:58, van den Heever, Christian CC 
>
 wrote:

Dear Spark users

I have been asked to provide a presentation / business case as to why to use 
spark and java as ingestion tool for HDFS and HIVE
And why to move away from an etl tool.

Could you be so kind as to provide with some pros and cons to this.

I have the following :

Pros:
In house build – code can be changes on the fly to suite business need.
Software is free
Can out of the box run on all nodes
Will support all Apache based software.
Fast deu to in memory processing
Spark UI can visualise execution
Support checkpoint data loads
Support echama regesty for custom schema and inference.
Support Yarn execution
Mlibs can be used in need.
Data linage support deu to spar usage.

Cons
Skills needed to maintain and build
In memory cabibility can become bottleneck if not managed
No ETL gui.

Maybe point be to an article if you have one.

Thanks a mill.
Christian




Standard Bank email disclaimer and confidentiality note
Please go to 
www.standardbank.co.za/site/homepage/emaildisclaimer.html
 to read our email disclaimer and confidentiality note. Kindly email 
disclai...@standardbank.co.za (no content 
or subject line necessary) if you cannot view that page and we will email our 
email disclaimer and confidentiality note to you.






Re: Is Spark suited for this use case?

2017-10-20 Thread JG Perrin
I have seen a similar scenario where we load data from a RDBMS into a NoSQL 
database… Spark made sense for velocity and parallel processing (and cost of 
licenses :) ).
 
> On Oct 15, 2017, at 21:29, Saravanan Thirumalai 
>  wrote:
> 
> We are an Investment firm and have a MDM platform in oracle at a vendor 
> location and use Oracle Golden Gate to replicat data to our data center for 
> reporting needs. 
> Our data is not big data (total size 6 TB including 2 TB of archive data). 
> Moreover our data doesn't get updated often, nightly once (around 50 MB) and 
> some correction transactions during the day (<10 MB). We don't have external 
> users and hence data doesn't grow real-time like e-commerce.
> 
> When we replicate data from source to target, we transfer data through files. 
> So, if there are DML operations (corrections) during day time on a source 
> table, the corresponding file would have probably 100 lines of table data 
> that needs to be loaded into the target database. Due to low volume of data 
> we designed this through Informatica and this works in less than 2-5 minutes. 
> Can Spark be used in this case or would it be an overkill of technology use?
> 
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java Rdd of String to dataframe

2017-10-20 Thread JG Perrin
SK,

Have you  considered:   
Dataset df = spark.read().json(dfWithStringRowsContainingJson);

jg

> On Oct 11, 2017, at 16:35, sk skk  wrote:
> 
> Can we create a dataframe from a Java pair rdd of String . I don’t have a 
> schema as it will be a dynamic Json. I gave encoders.string class.
> 
> Any help is appreciated !!
> 
> Thanks,
> SK



RE: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?

2017-10-10 Thread JG Perrin
Something along the line of:

Dataset df = spark.read().json(jsonDf); ?


From: kant kodali [mailto:kanth...@gmail.com]
Sent: Saturday, October 07, 2017 2:31 AM
To: user @spark 
Subject: How to convert Array of Json rows into Dataset of specific columns in 
Spark 2.2.0?


I have a Dataset ds which consists of json rows.

Sample Json Row (This is just an example of one row in the dataset)

[

{"name": "foo", "address": {"state": "CA", "country": "USA"}, 
"docs":[{"subject": "english", "year": 2016}]}

{"name": "bar", "address": {"state": "OH", "country": "USA"}, 
"docs":[{"subject": "math", "year": 2017}]}



]

ds.printSchema()

root

 |-- value: string (nullable = true)

Now I want to convert into the following dataset using Spark 2.2.0

name  | address   |  docs

--

"foo" | {"state": "CA", "country": "USA"} | [{"subject": "english", "year": 
2016}]

"bar" | {"state": "OH", "country": "USA"} | [{"subject": "math", "year": 2017}]

Preferably Java but Scala is also fine as long as there are functions available 
in Java API


RE: Spark 2.2.0 Win 7 64 bits Exception while deleting Spark temp dir

2017-10-03 Thread JG Perrin
do you have a little more to share with us?

maybe you can set another TEMP directory. are you getting a result?

From: usa usa [mailto:usact2...@gmail.com]
Sent: Tuesday, October 03, 2017 10:50 AM
To: user@spark.apache.org
Subject: Spark 2.2.0 Win 7 64 bits Exception while deleting Spark temp dir

Hi,
I have installed Spark 2.2.0 in win 7 64 bits.
When I did a test:
  c:>run-example SparkPI 10
I got error:
   Exception while deleting Spark temp dir
 C:\Users\jding01\AppData\Local\Temp\spark-xxx

The solution at

https://stackoverflow.com/questions/31274170/spark-error-error-utils-exception-while-deleting-spark-temp-dir
cannot help me.
Could anyone point out how to fix it ?
Thanks,
David


RE: Quick one... AWS SDK version?

2017-10-03 Thread JG Perrin
Sorry Steve - I may not have been very clear: thinking about 
aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled with 
Spark.

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, October 03, 2017 2:20 PM
To: JG Perrin <jper...@lumeris.com>
Cc: user@spark.apache.org
Subject: Re: Quick one... AWS SDK version?


On 3 Oct 2017, at 02:28, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:

Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

You generally to have to stick with the version which hadoop was built with I'm 
afraid...very brittle dependency.


RE: Quick one... AWS SDK version?

2017-10-03 Thread JG Perrin
Thanks Yash… this is helpful!

From: Yash Sharma [mailto:yash...@gmail.com]
Sent: Tuesday, October 03, 2017 1:02 AM
To: JG Perrin <jper...@lumeris.com>; user@spark.apache.org
Subject: Re: Quick one... AWS SDK version?


Hi JG,
Here are my cluster configs if it helps.

Cheers.

EMR: emr-5.8.0
Hadoop distribution: Amazon 2.7.3
AWS sdk: /usr/share/aws/aws-java-sdk/aws-java-sdk-1.11.160.jar
Applications:
Hive 2.3.0
Spark 2.2.0
Tez 0.8.4

On Tue, 3 Oct 2017 at 12:29 JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:
Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

Thanks!

jg


Quick one... AWS SDK version?

2017-10-02 Thread JG Perrin
Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

Thanks!

jg


RE: HDFS or NFS as a cache?

2017-10-02 Thread JG Perrin
Steve,

If I refer to the collect() API, it says “Running collect requires moving all 
the data into the application's driver process, and doing so on a very large 
dataset can crash the driver process with OutOfMemoryError.” So why would you 
need a distributed FS?

jg

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Saturday, September 30, 2017 6:10 AM
To: JG Perrin <jper...@lumeris.com>
Cc: Alexander Czech <alexander.cz...@googlemail.com>; user@spark.apache.org
Subject: Re: HDFS or NFS as a cache?


On 29 Sep 2017, at 20:03, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:

You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

no, it doesn't work quite like that.

1. workers generate their data and save somwhere
2. on "task commit" they move their data to some location where it will be 
visible for "job commit" (rename, upload, whatever)
3. job commit —which is done in the driver,— takes all the committed task data 
and makes it visible in the destination directory.
4. Then they create a _SUCCESS file to say "done!"


This is done with Spark talking between workers and drivers to guarantee that 
only one task working on a specific part of the data commits their work, only
committing the job once all tasks have finished

The v1 mapreduce committer implements (2) by moving files under a job attempt 
dir, and (3) by moving it from the job attempt dir to the destination. one 
rename per task commit, another rename of every file on job commit. In HFDS, 
Azure wasb and other stores with an O(1) atomic rename, this isn't *too* 
expensve, though that final job commit rename still takes time to list and move 
lots of files

The v2 committer implements (2) by renaming to the destination directory and 
(3) as a no-op. Rename in the tasks then, but not not that second, serialized 
one at the end

There's no copy of data from workers to driver, instead you need a shared 
output filesystem so that the job committer can do its work alongside the tasks.

There are alternatives committer agorithms,

1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code 
(https://github.com/SparkTC/stocator/)
3. Ongoing work in Hadoop itself for better committers. Goal: year end & Hadoop 
3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is all there, 
Parquet is a troublespot, and more testing is welcome from anyone who wants to 
help.
4. Databricks have "something"; specifics aren't covered, but I assume its 
dynamo DB based


-Steve







From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!



RE: Error - Spark reading from HDFS via dataframes - Java

2017-10-02 Thread JG Perrin
@Anastasios: just a word of caution, this is Spark 1.x CSV parser, there a few 
(minor) changes for Spark 2.x, you can have a look at 
http://jgp.net/2017/10/01/loading-csv-in-spark/.

From: Anastasios Zouzias [mailto:zouz...@gmail.com]
Sent: Sunday, October 01, 2017 2:05 AM
To: Kanagha Kumar 
Cc: user @spark 
Subject: Re: Error - Spark reading from HDFS via dataframes - Java

Hi,

Set the inferschema option to true in spark-csv. you may also want to set the 
mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" 
>:
Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the schema, 
I see all columns are being read as strings. I'm converting it to RDDs and 
creating another dataframe by passing in the correct schema ( how the rows 
should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external 
type for schema of bigint


Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header", 
"false").csv("hdfs:/inputpath/*");

Dataset ds = new 
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in the 
new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types from 
HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema once it 
is read?
How can the values by type cast correctly during this RDD to dataframe 
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!




RE: HDFS or NFS as a cache?

2017-09-29 Thread JG Perrin
You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!


RE: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread JG Perrin
On a test system, you can also use something like Owncloud/Nextcloud/Dropbox to 
insure that the files are synchronized. Would not do it for TB of data ;) ...

-Original Message-
From: Jörn Franke [mailto:jornfra...@gmail.com] 
Sent: Friday, September 29, 2017 5:14 AM
To: Gaurav1809 
Cc: user@spark.apache.org
Subject: Re: [Spark-Submit] Where to store data files while running job in 
cluster mode?

You should use a distributed filesystem such as HDFS. If you want to use the 
local filesystem then you have to copy each file to each node.

> On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
> 
> Hi All,
> 
> I have multi node architecture of (1 master,2 workers) Spark cluster, 
> the job runs to read CSV file data and it works fine when run on local 
> mode (Local(*)).
> However, when the same job is ran in cluster mode(Spark://HOST:PORT), 
> it is not able to read it.
> I want to know how to reference the files Or where to store them? 
> Currently the CSV data file is on master(from where the job is submitted).
> 
> Following code works fine in local mode but not in cluster mode.
> 
> val spark = SparkSession
>  .builder()
>  .appName("SampleFlightsApp")
>  .master("spark://masterIP:7077") // change it to 
> .master("local[*]) for local mode
>  .getOrCreate()
> 
>val flightDF =
> spark.read.option("header",true).csv("/home/username/sampleflightdata")
>flightDF.printSchema()
> 
> Error: FileNotFoundException: File 
> file:/home/username/sampleflightdata does not exist
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Loading objects only once

2017-09-28 Thread JG Perrin
Maybe load the model on each executor’s disk and load it from there? Depending 
on how you use the data/model, using something like Livy and sharing the same 
connection may help?

From: Naveen Swamy [mailto:mnnav...@gmail.com]
Sent: Wednesday, September 27, 2017 9:08 PM
To: user@spark.apache.org
Subject: Loading objects only once

Hello all,

I am a new user to Spark, please bear with me if this has been discussed 
earlier.

I am trying to run batch inference using DL frameworks pre-trained models and 
Spark. Basically, I want to download a model(which is usually ~500 MB) onto the 
workers and load the model and run inference on images fetched from the source 
like S3 something like this
rdd = sc.parallelize(load_from_s3)
rdd.map(fetch_from_s3).map(read_file).map(predict)

I was able to get it running in local mode on Jupyter, However, I would like to 
load the model only once and not every map operation. A setup hook would have 
nice which loads the model once into the JVM, I came across this JIRA 
https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use 
Singleton and static initialization. I tried to do this using a Singleton 
metaclass following the thread here 
https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. 
Following this failed miserably complaining that Spark cannot serialize ctype 
objects with pointer references.

After a lot of trial and error, I moved the code to a separate file by creating 
a static method for predict that checks if a class variable is set or not and 
loads the model if not set. This approach does not sound thread safe to me, So 
I wanted to reach out and see if there are established patterns on how to 
achieve something like this.


Also, I would like to understand the executor->tasks->python process mapping, 
Does each task gets mapped to a separate python process?  The reason I ask is I 
want to be to use mapPartition method to load a batch of files and run 
inference on them separately for which I need to load the object once per task. 
Any


Thanks for your time in answering my question.

Cheers, Naveen




RE: More instances = slower Spark job

2017-09-28 Thread JG Perrin
As the others have mentioned, your loading time might kill your benchmark… I am 
in a similar process right now, but I time each operation, load, process 1, 
process 2, etc. not always easy with lazy operators, but you can try to force 
operations with false collect and cache (for benchmarking purpose).

Also, give processing more importance (unless you really only want to have this 
light processing). Heavy computation (ML for example) should show a difference, 
but it may not be your use case.

From: Sonal Goyal [mailto:sonalgoy...@gmail.com]
Sent: Thursday, September 28, 2017 4:30 AM
To: Tejeshwar J1 
Cc: Jeroen Miller ; user@spark.apache.org
Subject: Re: More instances = slower Spark job

Also check if the compression algorithm you use is splittable?

Thanks,
Sonal
Nube Technologies




On Thu, Sep 28, 2017 at 2:17 PM, Tejeshwar J1 
>
 wrote:
Hi Miller,

Try using
1.coalesce(numberOfPartitions) to reduce the number of partitions in order to 
avoid idle cores .
2.Try reducing executor memory as you increase the number of executors.
3. Try performing GC or changing naïve java serialization to kryo serialization.


Thanks,
Tejeshwar


From: Jeroen Miller 
[mailto:bluedasya...@gmail.com]
Sent: Thursday, September 28, 2017 2:11 PM
To: user@spark.apache.org
Subject: More instances = slower Spark job

Hello,

I am experiencing a disappointing performance issue with my Spark jobs
as I scale up the number of instances.

The task is trivial: I am loading large (compressed) text files from S3,
filtering out lines that do not match a regex, counting the numbers
of remaining lines and saving the resulting datasets as (compressed)
text files on S3. Nothing that a simple grep couldn't do, except that
the files are too large to be downloaded and processed locally.

On a single instance, I can process X GBs per hour. When scaling up
to 10 instances, I noticed that processing the /same/ amount of data
actually takes /longer/.

This is quite surprising as the task is really simple: I was expecting
a significant speed-up. My naive idea was that each executors would
process a fraction of the input file, count the remaining lines /locally/,
and save their part of the processed file /independently/, thus no data
shuffling would occur.

Obviously, this is not what is happening.

Can anyone shed some light on this or provide pointers to relevant
information?

Regards,

Jeroen




RE: Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread JG Perrin
not using Yarn, just standalone cluster with 2 nodes here (physical, not even 
VM). network seems good between the nodes .

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Tuesday, September 26, 2017 10:39 AM
To: JG Perrin <jper...@lumeris.com>
Cc: user@spark.apache.org
Subject: Re: Debugging Initial job has not accepted any resources; check your 
cluster UI to ensure that workers are registered and have sufficient resources

I would check the queue you are submitting job, assuming it is yarn...

On Tue, Sep 26, 2017 at 11:40 PM, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:
Hi,

I get the infamous:
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I run the app via Eclipse, connecting:
SparkSession spark = SparkSession.builder()
.appName("Converter - Benchmark")
.master(ConfigurationManager.getMaster())
.config("spark.cores.max", "4")
.config("spark.executor.memory", "16g")
.getOrCreate();


Everything seems ok on the cluster side:
[cid:image001.png@01D336B5.FF7933B0]


I probably missed something super obvious, but can’t find it…

Any help/hint is welcome! - TIA

jg






--
Best Regards,
Ayan Guha


Debugging Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-26 Thread JG Perrin
Hi,

I get the infamous:
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I run the app via Eclipse, connecting:
SparkSession spark = SparkSession.builder()
.appName("Converter - Benchmark")
.master(ConfigurationManager.getMaster())
.config("spark.cores.max", "4")
.config("spark.executor.memory", "16g")
.getOrCreate();


Everything seems ok on the cluster side:
[cid:image001.png@01D336A3.2B215410]


I probably missed something super obvious, but can't find it...

Any help/hint is welcome! - TIA

jg





[Structured Streaming] Multiple sources best practice/recommendation

2017-09-13 Thread JG Perrin
Hi,

I have different files being dumped on S3, I want to ingest them and join them.

What does sound better to you? Have one " directory" for all or one per file 
format?

If I have one directory for all, can you get some metadata about the file, like 
its name?

If multiple directory, how can I have multiple "listeners"?

Thanks

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


RE: CSV write to S3 failing silently with partial completion

2017-09-07 Thread JG Perrin
Are you assuming that all partitions are of equal size? Did you try with more 
partitions (like repartitioning)? Does the error always happen with the last 
(or smaller) file? If you are sending to redshift, why not use the JDBC driver?

-Original Message-
From: abbim [mailto:ab...@amazon.com] 
Sent: Thursday, September 07, 2017 1:02 AM
To: user@spark.apache.org
Subject: CSV write to S3 failing silently with partial completion

Hi all,
My team has been experiencing a recurring unpredictable bug where only a 
partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.

This becomes problematic in the following ways:
1. When we copy the data to Redshift, we get a bad decrypt error on the partial 
file, suggesting that the failure occurred at a weird byte in the file. 
2. We lose data - sometimes as much as 10%.

We don't see this problem with parquet format, which we also use, but moving 
all of our data to parquet is not currently feasible. We're using the Java API 
with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:
df.write().csv("s3://some-bucket/some_location"). We're experiencing the issue 
1-3x/week on a daily job and are unable to reliably reproduce the problem. 

Any thoughts on why we might be seeing this and how to resolve?
Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Problem with CSV line break data in PySpark 2.1.0

2017-09-05 Thread JG Perrin
Have you tried the built-in parser, not the databricks one (which is not really 
used anymore)?
What is your original CSV looking like?
What is your code looking like? There are quite a few options to read a CSV…

From: Aakash Basu [mailto:aakash.spark@gmail.com]
Sent: Sunday, September 03, 2017 5:16 AM
To: user 
Subject: Problem with CSV line break data in PySpark 2.1.0

Hi,

I've a dataset where a few rows of the column F as shown below have line breaks 
in CSV file.

[Inline image 1]

When Spark is reading it, it is coming as below, which is a complete new line.

[Inline image 2]

I want my PySpark 2.1.0 to read it by forcefully avoiding the line break after 
the date, which is not happening as I am using com.databricks.csv reader. And 
nulls are getting created after the date for line 2 for the rest of the columns 
from G till end.

Can I please be helped how to handle this?

Thanks,
Aakash.

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


RE: from_json()

2017-08-30 Thread JG Perrin
Hey Sam,

Nope – it does not work the way I want. I guess it is only working with one 
type…

Trying to convert:
{"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic
 Beasts and Where to Find Them: The Original Screenplay"}

I get:
[Executor task launch worker for task 3:ERROR] Logging$class: Exception in task 
0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Failed to convert the JSON string 
'{"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic
 Beasts and Where to Find Them: The Original Screenplay"}' to a data type.
   at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:176)
   at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108)
   at org.apache.spark.sql.types.DataType.fromJson(DataType.scala)
   at 
net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:44)
   at 
net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:1)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   at java.lang.Thread.run(Unknown Source)




From: JG Perrin [mailto:jper...@lumeris.com]
Sent: Monday, August 28, 2017 1:29 PM
To: Sam Elamin <hussam.ela...@gmail.com>
Cc: user@spark.apache.org
Subject: RE: from_json()

Thanks Sam – this might be the solution. I will investigate!

From: Sam Elamin [mailto:hussam.ela...@gmail.com]
Sent: Monday, August 28, 2017 1:14 PM
To: JG Perrin <jper...@lumeris.com<mailto:jper...@lumeris.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: from_json()

Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new schema 
from a df its fairly simple, assuming you have a schema already predefined or 
in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)
Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg


This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender. This 
information may contain confidential health information that is legally 
privileged. The information is intended only for the use of the individual or 
entity named above. The authorized recipient of this transmission is prohibited 
from disclosing this information to any other party unless required to do so by 
law or regulation and is required to delete or destroy the information after 
its stated need has been fulfilled. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution or the taking of 
any action in reliance on or regarding the contents of this electronically 
transmitted information is strictly prohibited. If you have received this 
E-mail in error, please notify the sender and delete this message immediately.



RE: from_json()

2017-08-28 Thread JG Perrin
Thanks Sam – this might be the solution. I will investigate!

From: Sam Elamin [mailto:hussam.ela...@gmail.com]
Sent: Monday, August 28, 2017 1:14 PM
To: JG Perrin <jper...@lumeris.com>
Cc: user@spark.apache.org
Subject: Re: from_json()

Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new schema 
from a df its fairly simple, assuming you have a schema already predefined or 
in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)
Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg


This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender. This 
information may contain confidential health information that is legally 
privileged. The information is intended only for the use of the individual or 
entity named above. The authorized recipient of this transmission is prohibited 
from disclosing this information to any other party unless required to do so by 
law or regulation and is required to delete or destroy the information after 
its stated need has been fulfilled. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution or the taking of 
any action in reliance on or regarding the contents of this electronically 
transmitted information is strictly prohibited. If you have received this 
E-mail in error, please notify the sender and delete this message immediately.



from_json()

2017-08-28 Thread JG Perrin
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


RE: add me to email list

2017-08-28 Thread JG Perrin
Hey Mike,

You need to do it yourself, it’s really easy: 
http://spark.apache.org/community.html.

hih

jg

From: Michael Artz [mailto:michaelea...@gmail.com]
Sent: Monday, August 28, 2017 7:43 AM
To: user@spark.apache.org
Subject: add me to email list

Hi,
  Please add me to the email list
Mike

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


RE: Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-24 Thread JG Perrin
Thanks Michael – this is a great article… very helpful

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Wednesday, August 23, 2017 4:33 PM
To: JG Perrin <jper...@lumeris.com>
Cc: user@spark.apache.org
Subject: Re: Joining 2 dataframes, getting result as nested list/structure in 
dataframe

You can create a nested struct that contains multiple columns using struct().

Here's a pretty complete guide on working with nested data: 
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

On Wed, Aug 23, 2017 at 2:30 PM, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:
Hi folks,

I am trying to join 2 dataframes, but I would like to have the result as a list 
of rows of the right dataframe (dDf in the example) in a column of the left 
dataframe (cDf in the example). I made it work with one column, but having 
issues adding more columns/creating a row(?).
Seq joinColumns = new Set2<>("c1", "c2").toSeq();
Dataset allDf = cDf.join(dDf, joinColumns, "inner");
allDf.printSchema();
allDf.show();

Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
.agg(collect_list(col("c50")));
aggDf.show();

Output:
++---+---+
|c1  |c2 |collect_list(c50)  |
++---+---+
|3744|1160242| [6, 5, 4, 3, 2, 1]|
|3739|1150097|[1]|
|3780|1159902|[5, 4, 3, 2, 1]|
| 132|1200743|   [4, 3, 2, 1]|
|3778|1183204|[1]|
|3766|1132709|[1]|
|3835|1146169|[1]|
++---+---+

Thanks,

jg



This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender. This 
information may contain confidential health information that is legally 
privileged. The information is intended only for the use of the individual or 
entity named above. The authorized recipient of this transmission is prohibited 
from disclosing this information to any other party unless required to do so by 
law or regulation and is required to delete or destroy the information after 
its stated need has been fulfilled. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution or the taking of 
any action in reliance on or regarding the contents of this electronically 
transmitted information is strictly prohibited. If you have received this 
E-mail in error, please notify the sender and delete this message immediately.



Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread JG Perrin
Hi folks,

I am trying to join 2 dataframes, but I would like to have the result as a list 
of rows of the right dataframe (dDf in the example) in a column of the left 
dataframe (cDf in the example). I made it work with one column, but having 
issues adding more columns/creating a row(?).
Seq joinColumns = new Set2<>("c1", "c2").toSeq();
Dataset allDf = cDf.join(dDf, joinColumns, "inner");
allDf.printSchema();
allDf.show();

Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
.agg(collect_list(col("c50")));
aggDf.show();

Output:
++---+---+
|c1  |c2 |collect_list(c50)  |
++---+---+
|3744|1160242| [6, 5, 4, 3, 2, 1]|
|3739|1150097|[1]|
|3780|1159902|[5, 4, 3, 2, 1]|
| 132|1200743|   [4, 3, 2, 1]|
|3778|1183204|[1]|
|3766|1132709|[1]|
|3835|1146169|[1]|
++---+---+

Thanks,

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.