Swift question regarding in-memory snapshots of compact table data

2016-11-09 Thread Daniel Schulz
Daniel Schulz hat eine OneDrive-Datei mit Ihnen geteilt. Um diese anzuzeigen, 
klicken Sie unten auf den Link.



[https://r1.res.office365.com/owa/prem/images/dc-png_20.png]

timeline.PNG



Hello,

I got a swift question regarding Spark to you.

We'd like to do various calculations based on the very same Thresholds. The 
latter are compact domain-defined data. They are basically a Key/Value map 
(from Text to Text). The map has approx. 300 entries, so it is a (300 x 
2)-matrix. All entries being read and provided immutable at the very start of 
the runtime is crucial to us. Even upon change of the persisted data on disk 
and when a Worker Node goes down and needs to restart "from the beginning." The 
timeline looks as follows:


... 
--||--|-|-->
 t
  start  t1 t2end

t1 read all Thresholds
t2 start all calculations based on Thresholds


It is crucial that Thresholds - once being read - will not change in memory. 
Even if persisted data on disk do; the values shall be the very same to all 
Worker Nodes - even if they need to start very late or restart all over again. 
Our Spark-Application is no Streaming-Application. The calculations for being 
correct need to have the very same data to all Worker Nodes - regardless of 
their start time.

What do you consider best to provide Worker Nodes for those kind of 
calculations?

  1.  as Scala variable (no RDD), which is not lazy evaluated and being handed 
over to calculations
  2.  as RDD/DataFrame in Spark - no Broadcast
  3.  as RDD/DataFrame-Broadcast in Spark
  4.  very different approach - please elaborate a bit

What are possible short comings of doing no Broadcast? What are possible short 
comings of doing the Broadcast?

Thanks a lot.

Kind regards, Daniel.





RE: how to merge dataframe write output files

2016-11-09 Thread Shreya Agarwal
Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it’ll make your write incredibly slow and also it’ll take 
away all the speed of reading in the data from a parquet as there won’t be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is – Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it’ll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark 
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc
more an more...
2016-11-10

lk_spark


Akka Stream as the source for Spark Streaming. Please advice...

2016-11-09 Thread shyla deshpande
I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
Spark Streaming and Cassandra using Structured Streaming. But the kafka
source support for Structured Streaming is not yet available. So now I am
trying to use Akka Stream as the source to Spark Streaming.

Want to make sure I am heading in the right direction. Please direct me to
any sample code and reading material for this.

Thanks


how to merge dataframe write output files

2016-11-09 Thread lk_spark
hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times 
Container exited with a non-zero exit code 143
more an more... 
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc
more an more... 
2016-11-10


lk_spark 

Unable to lauch Python Web Application on Spark Cluster

2016-11-09 Thread anjali gautam
Hello Everyone,

I have developed a web application (say abc) in Python using web.py. I want
to deploy it on the Spark Cluster. Since this application is a project with
dependencies therefore I have made a zip file of the project (abc.zip) to
be deployed on the cluster. In the project abc I have to execute
application.py file which has the main method to start my web application.
The command to execute the project is :

bin/spark-submit --master spark://master:7077 --py-files abc.zip
application.py

The structure of the project abc is that it has a data folder which has a
file data.txt. The other python files in the project abc access that
data.txt.

On executing the above command and then opening the browser to access the
developed web application results in an error of "data/data.txt not found"
when actually it is present.

I have been trying this for long. but everytime it shows the same error.
Can anybody help me in this case. Plz let me know if any other info is
required.

Thanks & Regards,
Anjali


Hive Queries are running very slowly in Spark 2.0

2016-11-09 Thread Jaya Shankar Vadisela
Hi ALL

I have below simple HIVE Query, we have a use-case where we will run multiple 
HIVE queries in parallel, in our case it is 16 (num of cores in our machine, 
using scala PAR array). In Spark 1.6 it is executing in 10 secs but in Spark 
2.0 same queries are taking 5 mins.

"select * from emp as e join dept as d on o.dept_id = t.dept_id where o.dept_id 
= 100"

Can someone help me what could be wrong.

regards, Jay


Hive Queries are running very slowly in Spark 2.0

2016-11-09 Thread Jaya Shankar Vadisela
Hi ALL

I have below simple HIVE Query, we have a use-case where we will run multiple 
HIVE queries in parallel, in our case it is 16 (num of cores in our machine, 
using scala PAR array). In Spark 1.6 it is executing in 10 secs but in Spark 
2.0 same queries are taking 5 mins.

"select * from emp as e join dept as d on o.dept_id = t.dept_id where o.dept_id 
= 100”

Can someone help me what could be wrong.

regards, Jay


Re: Access_Remote_Kerberized_Cluster_Through_Spark

2016-11-09 Thread Ajay Chander
Hi Everyone,

I am still trying to figure this one out. I am stuck with this error
"java.io.IOException:
Can't get Master Kerberos principal for use as renewer ". Below is my code.
Can any of you please provide any insights on this? Thanks for your time.


import java.io.{BufferedInputStream, File, FileInputStream}
import java.net.URI

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SparkConf, SparkContext}


object SparkHdfs {

  def main(args: Array[String]): Unit = {

System.setProperty("java.security.krb5.conf", new
File("src\\main\\files\\krb5.conf").getAbsolutePath )
System.setProperty("sun.security.krb5.debug", "true")

val sparkConf = new SparkConf().setAppName("SparkHdfs").setMaster("local")
val sc = new SparkContext(sparkConf)
//Loading remote cluster configurations
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\core-site.xml").getAbsolutePath )
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\hdfs-site.xml").getAbsolutePath )
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\mapred-site.xml").getAbsolutePath )
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\yarn-site.xml").getAbsolutePath )
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\ssl-client.xml").getAbsolutePath )
sc.hadoopConfiguration.addResource(new
File("src\\main\\files\\topology.map").getAbsolutePath )

val conf = new Configuration()
//Loading remote cluster configurations
conf.addResource(new Path(new
File("src\\main\\files\\core-site.xml").getAbsolutePath ))
conf.addResource(new Path(new
File("src\\main\\files\\hdfs-site.xml").getAbsolutePath ))
conf.addResource(new Path(new
File("src\\main\\files\\mapred-site.xml").getAbsolutePath ))
conf.addResource(new Path(new
File("src\\main\\files\\yarn-site.xml").getAbsolutePath ))
conf.addResource(new Path(new
File("src\\main\\files\\ssl-client.xml").getAbsolutePath ))
conf.addResource(new Path(new
File("src\\main\\files\\topology.map").getAbsolutePath ))

conf.set("hadoop.security.authentication", "Kerberos")

UserGroupInformation.setConfiguration(conf)

UserGroupInformation.loginUserFromKeytab("my...@internal.company.com",
  new File("src\\main\\files\\myusr.keytab").getAbsolutePath )

//   
SparkHadoopUtil.get.loginUserFromKeytab("tsad...@internal.imsglobal.com",
//  new File("src\\main\\files\\tsadusr.keytab").getAbsolutePath)
//Getting this error: java.io.IOException: Can't get Master
Kerberos principal for use as renewer


sc.textFile("hdfs://vm1.comp.com:8020/user/myusr/temp/file1").collect().foreach(println)
//Getting this error: java.io.IOException: Can't get Master
Kerberos principal for use as renewer

  }
}




On Mon, Nov 7, 2016 at 9:42 PM, Ajay Chander  wrote:

> Did anyone use https://www.codatlas.com/github.com/apache/spark/HEAD/
> core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala to
> interact with secured Hadoop from Spark ?
>
> Thanks,
> Ajay
>
> On Mon, Nov 7, 2016 at 4:37 PM, Ajay Chander  wrote:
>
>>
>> Hi Everyone,
>>
>> I am trying to develop a simple codebase on my machine to read data from
>> secured Hadoop cluster. We have a development cluster which is secured
>> through Kerberos and I want to run a Spark job from my IntelliJ to read
>> some sample data from the cluster. Has anyone done this before ? Can you
>> point me to some sample examples?
>>
>> I understand that, if we want to talk to secured cluster, we need to have
>> keytab and principle. I tried using it through
>> UserGroupInformation.loginUserFromKeytab and
>> SparkHadoopUtil.get.loginUserFromKeytab but so far no luck.
>>
>> I have been trying to do this from quite a while ago. Please let me know
>> if you need more info. Thanks
>>
>> Regards,
>> Ajay
>>
>
>


Re: Issue Running sparkR on YARN

2016-11-09 Thread Felix Cheung
It maybe the Spark executor is running as a different user and it can't see 
where RScript is?

You might want to try putting Rscript path to PATH.

Also please see this for the config property to set for the R command to use:
https://spark.apache.org/docs/latest/configuration.html#sparkr



_
From: ian.malo...@tdameritrade.com
Sent: Wednesday, November 9, 2016 12:12 PM
Subject: Issue Running sparkR on YARN
To: >


Hi,

I'm trying to run sparkR (1.5.2) on YARN and I get:

java.io.IOException: Cannot run program "Rscript": error=2, No such file or 
directory

This strikes me as odd, because I can go to each node and various users and 
type Rscript and it works. I've done this on each node and spark-env.sh as 
well: export R_HOME=/path/to/R

This is how I'm setting it on the nodes (/etc/profile.d/path_edit.sh):

export R_HOME=/app/hdp_app/anaconda/bin/R
PATH=$PATH:/app/hdp_app/anaconda/bin

Any ideas?

Thanks,

Ian

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread Michael Segel
Oozie, a product only a mad Russian would love. ;-)

Just say no to hive. Go from Flat to Parquet.
(This sounds easy, but there’s some work that has to occur…)

Sorry for being cryptic, Mich’s question is pretty much generic for anyone 
building a data lake so it ends up overlapping with some work that I have to do…

-Mike

On Nov 9, 2016, at 4:16 PM, Mich Talebzadeh 
> wrote:

Thanks guys,

Sounds like let Informatica get the data out of RDBMS and create mapping to 
flat files that will be delivered to a directory visible by HDFS host. Then 
push the csv files into HDFS. then there are number of options to work on:


  1.  run cron or oozie to get data out of HDFS (or build external Hive table 
on that directory) and do insert/select into Hive managed table
  2.  alternatively use a spark job to get CSV data into RDD and then create 
tempTable and do insert/select from tempTable to Hive table. Bear in mind that 
we need a spark job tailored to each table schema

I believe the above is feasible?





Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 9 November 2016 at 21:26, Jörn Franke 
> wrote:
Basically you mention the options. However, there are several ways how 
informatica can extract (or store) from/to rdbms. If the native option is not 
available then you need to go via JDBC as you have described.
Alternatively (but only if it is worth it) you can schedule fetching of the 
files via oozie and use it to convert the csv into orc/ parquet etc.
If this is a common use case in the company you can extend informatica with 
Java classes that for instance convert the data directly into parquet or orc. 
However, is some effort.

On 9 Nov 2016, at 14:56, Mich Talebzadeh 
> wrote:

Hi,

I am exploring the idea of flexibility with importing multiple RDBMS tables 
using Informatica that customer has into HDFS.

I don't want to use connectivity tools from Informatica to Hive etc.

So this is what I have in mind


  1.
If possible get the tables data out using Informatica and use Informatica ui  
to convert RDBMS data into some form of CSV, TSV file (Can Informatica do it?) 
I guess yes
  2.
Put the flat files on an edge where HDFS node can see them.
  3.
Assuming that a directory can be created by Informatica daily, periodically run 
a cron that ingest that data from directories into HDFS equivalent daily 
directories
  4.
Once the data is in HDFS one can use, Spark csv, Hive etc to query data

The problem I have is to see if someone has done such thing before. 
Specifically can Informatica create target flat files on normal directories.

Any other generic alternative?

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.






Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread Mich Talebzadeh
Thanks guys,

Sounds like let Informatica get the data out of RDBMS and create mapping to
flat files that will be delivered to a directory visible by HDFS host. Then
push the csv files into HDFS. then there are number of options to work on:


   1. run cron or oozie to get data out of HDFS (or build external Hive
   table on that directory) and do insert/select into Hive managed table
   2. alternatively use a spark job to get CSV data into RDD and then
   create tempTable and do insert/select from tempTable to Hive table. Bear in
   mind that we need a spark job tailored to each table schema


I believe the above is feasible?





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 9 November 2016 at 21:26, Jörn Franke  wrote:

> Basically you mention the options. However, there are several ways how
> informatica can extract (or store) from/to rdbms. If the native option is
> not available then you need to go via JDBC as you have described.
> Alternatively (but only if it is worth it) you can schedule fetching of
> the files via oozie and use it to convert the csv into orc/ parquet etc.
> If this is a common use case in the company you can extend informatica
> with Java classes that for instance convert the data directly into parquet
> or orc. However, is some effort.
>
> On 9 Nov 2016, at 14:56, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> I am exploring the idea of flexibility with importing multiple RDBMS
> tables using Informatica that customer has into HDFS.
>
> I don't want to use connectivity tools from Informatica to Hive etc.
>
> So this is what I have in mind
>
>
>1. If possible get the tables data out using Informatica and use
>Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
>Informatica do it?) I guess yes
>2. Put the flat files on an edge where HDFS node can see them.
>3. Assuming that a directory can be created by Informatica daily,
>periodically run a cron that ingest that data from directories into HDFS
>equivalent daily directories
>4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
>data
>
> The problem I have is to see if someone has done such thing before.
> Specifically can Informatica create target flat files on normal directories.
>
> Any other generic alternative?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


How to interpret the Time Line on "Details for Stage" Spark UI page

2016-11-09 Thread Xiaoye Sun
Hi,

I am using Spark 1.6.1, and I am looking at the Event Timeline on "Details
for Stage" Spark UI web page in detail.

I found that the "scheduler delay" on event timeline is somehow
misrepresented. I want to confirm if my understanding is correct.

Here is the detailed description:
In Spark's code, I found that the definition of "SCHEDULER_DELAY" is that
"scheduler delay includes time to ship the task from the scheduler to the
executor, and time to send the task result from the executor to the
scheduler. If scheduler delay is large, consider decreasing the size of
tasks or decreasing the size of task results"

My interpretation of the definition is that the scheduler delay has two
components. The first component happens at the beginning of a task when
scheduler assigns task executable to the executor; The second component
happens at the end of a task when the scheduler collects the results from
the executor.

However, on the event timeline figure, there is only one section for the
scheduler delay at the beginning of each task, whose length represents the
SUM of these two components. This means that the following "Task
Deserialization Time" , “Shuffle Read Time", "Executor Computing Time",
etc, should have started earlier on this event timeline figure.


Best,
Xiaoye


Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Michael Armbrust
It would be great if you could try with the 2.0.2 RC.  Thanks for creating
an issue.

On Wed, Nov 9, 2016 at 1:22 PM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Well I've tried with 1.5.2, 1.6.2 and 2.0.1
>
> FYI, I have created https://issues.apache.org/jira/browse/SPARK-18388
>
> On Wed, Nov 9, 2016 at 3:08 PM, Michael Armbrust 
> wrote:
>
>> Which version of Spark?  Does seem like a bug.
>>
>> On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
>> raviteja.lokin...@gmail.com> wrote:
>>
>>> Does this stacktrace look like a bug guys? Definitely seems like one to
>>> me.
>>>
>>> Caused by: java.lang.StackOverflowError
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>>
>>> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
>>> raviteja.lokin...@gmail.com> wrote:
>>>
 Hi all,

 I am not sure if this is a bug or not. Basically I am generating weekly
 aggregates of every column of data.

 Adding source code here (also attached):

 from pyspark.sql.window import Window
 from pyspark.sql.functions import *

 timeSeries = sqlContext.read.option("header", 
 "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")

 # Hive timestamp is interpreted as UNIX timestamp in seconds*
 days = lambda i: i * 86400

 w = (Window()
  .partitionBy("id")
  .orderBy(col("dt").cast("timestamp").cast("long"))
  .rangeBetween(-days(6), 0))

 cols = ["id", "dt"]
 skipCols = ["id", "dt"]

 for col in timeSeries.columns:
 if col in skipCols:
 continue
 cols.append(mean(col).over(w).alias("mean_7_"+col))
 cols.append(count(col).over(w).alias("count_7_"+col))
 cols.append(sum(col).over(w).alias("sum_7_"+col))
 cols.append(min(col).over(w).alias("min_7_"+col))
 cols.append(max(col).over(w).alias("max_7_"+col))

 df = timeSeries.select(cols)
 df.orderBy('id', 'dt').write\
 
 .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
 .save("file:///tmp/spark-bug-out.csv")


 Thanks,
 --
 *Raviteja Lokineni* | Business Intelligence Developer
 TD Ameritrade

 E: raviteja.lokin...@gmail.com

 [image: View Raviteja Lokineni's profile on LinkedIn]
 


>>>
>>>
>>> --
>>> *Raviteja Lokineni* | Business Intelligence Developer
>>> TD Ameritrade
>>>
>>> E: raviteja.lokin...@gmail.com
>>>
>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>> 
>>>
>>>
>>
>
>
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> 
>
>


Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread Jörn Franke
Basically you mention the options. However, there are several ways how 
informatica can extract (or store) from/to rdbms. If the native option is not 
available then you need to go via JDBC as you have described. 
Alternatively (but only if it is worth it) you can schedule fetching of the 
files via oozie and use it to convert the csv into orc/ parquet etc.
If this is a common use case in the company you can extend informatica with 
Java classes that for instance convert the data directly into parquet or orc. 
However, is some effort.

> On 9 Nov 2016, at 14:56, Mich Talebzadeh  wrote:
> 
> Hi,
> 
> I am exploring the idea of flexibility with importing multiple RDBMS tables 
> using Informatica that customer has into HDFS.
> 
> I don't want to use connectivity tools from Informatica to Hive etc.
> 
> So this is what I have in mind
> 
> If possible get the tables data out using Informatica and use Informatica ui  
> to convert RDBMS data into some form of CSV, TSV file (Can Informatica do 
> it?) I guess yes
> Put the flat files on an edge where HDFS node can see them.
> Assuming that a directory can be created by Informatica daily, periodically 
> run a cron that ingest that data from directories into HDFS equivalent daily 
> directories
> Once the data is in HDFS one can use, Spark csv, Hive etc to query data
> The problem I have is to see if someone has done such thing before. 
> Specifically can Informatica create target flat files on normal directories.
> 
> Any other generic alternative?
> 
> Thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Raviteja Lokineni
Well I've tried with 1.5.2, 1.6.2 and 2.0.1

FYI, I have created https://issues.apache.org/jira/browse/SPARK-18388

On Wed, Nov 9, 2016 at 3:08 PM, Michael Armbrust 
wrote:

> Which version of Spark?  Does seem like a bug.
>
> On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
> raviteja.lokin...@gmail.com> wrote:
>
>> Does this stacktrace look like a bug guys? Definitely seems like one to
>> me.
>>
>> Caused by: java.lang.StackOverflowError
>>  at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>
>>
>> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
>> raviteja.lokin...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am not sure if this is a bug or not. Basically I am generating weekly
>>> aggregates of every column of data.
>>>
>>> Adding source code here (also attached):
>>>
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import *
>>>
>>> timeSeries = sqlContext.read.option("header", 
>>> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>>>
>>> # Hive timestamp is interpreted as UNIX timestamp in seconds*
>>> days = lambda i: i * 86400
>>>
>>> w = (Window()
>>>  .partitionBy("id")
>>>  .orderBy(col("dt").cast("timestamp").cast("long"))
>>>  .rangeBetween(-days(6), 0))
>>>
>>> cols = ["id", "dt"]
>>> skipCols = ["id", "dt"]
>>>
>>> for col in timeSeries.columns:
>>> if col in skipCols:
>>> continue
>>> cols.append(mean(col).over(w).alias("mean_7_"+col))
>>> cols.append(count(col).over(w).alias("count_7_"+col))
>>> cols.append(sum(col).over(w).alias("sum_7_"+col))
>>> cols.append(min(col).over(w).alias("min_7_"+col))
>>> cols.append(max(col).over(w).alias("max_7_"+col))
>>>
>>> df = timeSeries.select(cols)
>>> df.orderBy('id', 'dt').write\
>>> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
>>> .save("file:///tmp/spark-bug-out.csv")
>>>
>>>
>>> Thanks,
>>> --
>>> *Raviteja Lokineni* | Business Intelligence Developer
>>> TD Ameritrade
>>>
>>> E: raviteja.lokin...@gmail.com
>>>
>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>> 
>>>
>>>
>>
>>
>> --
>> *Raviteja Lokineni* | Business Intelligence Developer
>> TD Ameritrade
>>
>> E: raviteja.lokin...@gmail.com
>>
>> [image: View Raviteja Lokineni's profile on LinkedIn]
>> 
>>
>>
>


-- 
*Raviteja Lokineni* | Business Intelligence Developer
TD Ameritrade

E: raviteja.lokin...@gmail.com

[image: View Raviteja Lokineni's profile on LinkedIn]



Issue Running sparkR on YARN

2016-11-09 Thread Ian.Maloney
Hi,

I’m trying to run sparkR (1.5.2) on YARN and I get:

 java.io.IOException: Cannot run program "Rscript": error=2, No such file or 
directory

This strikes me as odd, because I can go to each node and various users and 
type Rscript and it works. I’ve done this on each node and spark-env.sh as 
well: export R_HOME=/path/to/R

This is how I’m setting it on the nodes (/etc/profile.d/path_edit.sh):

export R_HOME=/app/hdp_app/anaconda/bin/R
PATH=$PATH:/app/hdp_app/anaconda/bin

Any ideas?

Thanks,

Ian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Michael Armbrust
Which version of Spark?  Does seem like a bug.

On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Does this stacktrace look like a bug guys? Definitely seems like one to me.
>
> Caused by: java.lang.StackOverflowError
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>
>
> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
> raviteja.lokin...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am not sure if this is a bug or not. Basically I am generating weekly
>> aggregates of every column of data.
>>
>> Adding source code here (also attached):
>>
>> from pyspark.sql.window import Window
>> from pyspark.sql.functions import *
>>
>> timeSeries = sqlContext.read.option("header", 
>> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>>
>> # Hive timestamp is interpreted as UNIX timestamp in seconds*
>> days = lambda i: i * 86400
>>
>> w = (Window()
>>  .partitionBy("id")
>>  .orderBy(col("dt").cast("timestamp").cast("long"))
>>  .rangeBetween(-days(6), 0))
>>
>> cols = ["id", "dt"]
>> skipCols = ["id", "dt"]
>>
>> for col in timeSeries.columns:
>> if col in skipCols:
>> continue
>> cols.append(mean(col).over(w).alias("mean_7_"+col))
>> cols.append(count(col).over(w).alias("count_7_"+col))
>> cols.append(sum(col).over(w).alias("sum_7_"+col))
>> cols.append(min(col).over(w).alias("min_7_"+col))
>> cols.append(max(col).over(w).alias("max_7_"+col))
>>
>> df = timeSeries.select(cols)
>> df.orderBy('id', 'dt').write\
>> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
>> .save("file:///tmp/spark-bug-out.csv")
>>
>>
>> Thanks,
>> --
>> *Raviteja Lokineni* | Business Intelligence Developer
>> TD Ameritrade
>>
>> E: raviteja.lokin...@gmail.com
>>
>> [image: View Raviteja Lokineni's profile on LinkedIn]
>> 
>>
>>
>
>
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> 
>
>


Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread ayan guha
Yes, it can be done and a standard practice. I would suggest a mixed
approach: use Informatica to create files in hdfs and have hive staging
tables as external tables on those directories. Then that point onwards use
spark.

Hth
Ayan
On 10 Nov 2016 04:00, "Mich Talebzadeh"  wrote:

> Thanks Mike for insight.
>
> This is a request landed on us which is rather unusual.
>
> As I understand Informatica is an ETL tool. Most of these are glorified
> Sqoop with GUI where you define your source and target.
>
> In a normal day Informatica takes data out of an RDBMS like Oracle table
> and lands it on Teradata or Sybase IQ (DW).
>
> So in our case we really need to redefine the map. Customer does not want
> the plug in from the Informatica for Hive etc which admittedly will make
> life far easier. They want us to come up with a solution.
>
> In the absence of the fact that we cannot use JDBC for Hive etc as target
> (?), the easiest option is to dump it into landing zone and then do
> whatever we want with it.
>
> Also I am not sure we can use Flume for it? That was a thought in my mind.
>
> So sort of stuck between Hard and Rock here. So in short we want a plug in
> to be consumer of Informatica.
>
> cheers
>
> Mich
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 9 November 2016 at 16:14, Michael Segel 
> wrote:
>
>> Mich,
>>
>> You could do that. But really?
>>
>> Putting on my solutions architect hat…
>>
>> You or your client is spending $$$ for product licensing and you’re not
>> really using the product to its fullest.
>>
>> Yes, you can use Informatica to pull data from the source systems and
>> provide some data cleansing and transformations before you drop it on your
>> landing zone.
>>
>> If you’re going to bypass Hive, then you have to capture the schema,
>> including data types.  You’re also going to have to manage schema evolution
>> as they change over time. (I believe the ETL tools will do this for you or
>> help in the process.)
>>
>> But if you’re already working on the consumption process for ingestion on
>> your own… what is the value that you derive from using Informatica?  Is the
>> unloading and ingestion process that difficult that you can’t write that as
>> well?
>>
>> My point is that if you’re going to use the tool, use it as the vendor
>> recommends (and they may offer options…) or skip it.
>>
>> I mean heck… you may want to take the flat files (CSV, etc) that are
>> dropped in the landing zone, and then ingest and spit out parquet files via
>> spark. You just need to know the Schema(s) of ingestion and output if they
>> are not the same. ;-)
>>
>> Of course you may decide that using Informatica to pull and transform the
>> data and drop it on to the landing zone provides enough value to justify
>> its expense.  ;-) YMMV
>>
>> Just my $0.02 worth.
>>
>> Take it with a grain of Kosher Sea Salt.  (The grains are larger and the
>> salt taste’s better) ;-)
>>
>> -Mike
>>
>> On Nov 9, 2016, at 7:56 AM, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> I am exploring the idea of flexibility with importing multiple RDBMS
>> tables using Informatica that customer has into HDFS.
>>
>> I don't want to use connectivity tools from Informatica to Hive etc.
>>
>> So this is what I have in mind
>>
>>
>>1. If possible get the tables data out using Informatica and use
>>Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
>>Informatica do it?) I guess yes
>>2. Put the flat files on an edge where HDFS node can see them.
>>3. Assuming that a directory can be created by Informatica daily,
>>periodically run a cron that ingest that data from directories into HDFS
>>equivalent daily directories
>>4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
>>data
>>
>> The problem I have is to see if someone has done such thing before.
>> Specifically can Informatica create target flat files on normal directories.
>>
>> Any other generic alternative?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or 

Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Raviteja Lokineni
Does this stacktrace look like a bug guys? Definitely seems like one to me.

Caused by: java.lang.StackOverflowError
at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
at scala.collection.immutable.List.foreach(List.scala:381)


On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Hi all,
>
> I am not sure if this is a bug or not. Basically I am generating weekly
> aggregates of every column of data.
>
> Adding source code here (also attached):
>
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
>
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
>
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
>
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
>
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
>
> df = timeSeries.select(cols)
> df.orderBy('id', 'dt').write\
> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
> .save("file:///tmp/spark-bug-out.csv")
>
>
> Thanks,
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> 
>
>


-- 
*Raviteja Lokineni* | Business Intelligence Developer
TD Ameritrade

E: raviteja.lokin...@gmail.com

[image: View Raviteja Lokineni's profile on LinkedIn]



Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-09 Thread coolgar
Solution provided by Cody K :

I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Save a spark RDD to disk

2016-11-09 Thread Michael Segel
Can you increase the number of partitions and also increase the number of 
executors?
(This should improve the parallelization but you may become disk i/o bound)

On Nov 8, 2016, at 4:08 PM, Elf Of Lothlorein 
> wrote:

Hi
I am trying to save a RDD to disk and I am using the saveAsNewAPIHadoopFile for 
that. I am seeing that it takes almost 20 mins for about 900 GB of data. Is 
there any parameter that I can tune to make this saving faster.
I am running about 45 executors with 5 cores each on 5 Spark worker nodes and 
using Spark on YARN for this..
Thanks for your help.
C



Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread Mich Talebzadeh
Thanks Mike for insight.

This is a request landed on us which is rather unusual.

As I understand Informatica is an ETL tool. Most of these are glorified
Sqoop with GUI where you define your source and target.

In a normal day Informatica takes data out of an RDBMS like Oracle table
and lands it on Teradata or Sybase IQ (DW).

So in our case we really need to redefine the map. Customer does not want
the plug in from the Informatica for Hive etc which admittedly will make
life far easier. They want us to come up with a solution.

In the absence of the fact that we cannot use JDBC for Hive etc as target
(?), the easiest option is to dump it into landing zone and then do
whatever we want with it.

Also I am not sure we can use Flume for it? That was a thought in my mind.

So sort of stuck between Hard and Rock here. So in short we want a plug in
to be consumer of Informatica.

cheers

Mich

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 9 November 2016 at 16:14, Michael Segel 
wrote:

> Mich,
>
> You could do that. But really?
>
> Putting on my solutions architect hat…
>
> You or your client is spending $$$ for product licensing and you’re not
> really using the product to its fullest.
>
> Yes, you can use Informatica to pull data from the source systems and
> provide some data cleansing and transformations before you drop it on your
> landing zone.
>
> If you’re going to bypass Hive, then you have to capture the schema,
> including data types.  You’re also going to have to manage schema evolution
> as they change over time. (I believe the ETL tools will do this for you or
> help in the process.)
>
> But if you’re already working on the consumption process for ingestion on
> your own… what is the value that you derive from using Informatica?  Is the
> unloading and ingestion process that difficult that you can’t write that as
> well?
>
> My point is that if you’re going to use the tool, use it as the vendor
> recommends (and they may offer options…) or skip it.
>
> I mean heck… you may want to take the flat files (CSV, etc) that are
> dropped in the landing zone, and then ingest and spit out parquet files via
> spark. You just need to know the Schema(s) of ingestion and output if they
> are not the same. ;-)
>
> Of course you may decide that using Informatica to pull and transform the
> data and drop it on to the landing zone provides enough value to justify
> its expense.  ;-) YMMV
>
> Just my $0.02 worth.
>
> Take it with a grain of Kosher Sea Salt.  (The grains are larger and the
> salt taste’s better) ;-)
>
> -Mike
>
> On Nov 9, 2016, at 7:56 AM, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> I am exploring the idea of flexibility with importing multiple RDBMS
> tables using Informatica that customer has into HDFS.
>
> I don't want to use connectivity tools from Informatica to Hive etc.
>
> So this is what I have in mind
>
>
>1. If possible get the tables data out using Informatica and use
>Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
>Informatica do it?) I guess yes
>2. Put the flat files on an edge where HDFS node can see them.
>3. Assuming that a directory can be created by Informatica daily,
>periodically run a cron that ingest that data from directories into HDFS
>equivalent daily directories
>4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
>data
>
> The problem I have is to see if someone has done such thing before.
> Specifically can Informatica create target flat files on normal directories.
>
> Any other generic alternative?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Re: Physical plan for windows and joins - how to know which is faster?

2016-11-09 Thread Silvio Fiorito
Hi Jacek,


I haven't played with 2.1.0 yet, so not sure how much more optimized Window 
functions are compared to 1.6 and 2.0.


However, one thing I do see in the self-join is a broadcast. So there's going 
to be a need broadcast the results of the groupBy out to the executors before 
it can do the join. In both cases it's shuffling the data (for the groupBy or 
the Window).


Have you tried running both queries to see? Would be interesting to test it on 
varying data volumes as well (e.g. what if there's no broadcast).


Thanks,

Silvio


From: Jacek Laskowski 
Sent: Wednesday, November 9, 2016 7:36:47 AM
To: user
Subject: Physical plan for windows and joins - how to know which is faster?

Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
  +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   : +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
  +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
 +- Exchange hashpartitioning(ID % 3#681, 200)
+- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
   +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
  +- *Filter isnotnull((_1#12 % 3))
 +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to impersonate a user from a Spark program

2016-11-09 Thread Samy Dindane

Hi,

In order to impersonate a user when submitting a job with `spark-submit`, the 
`proxy-user` option is used.
Is there a similar feature when running a job inside a Scala program? Maybe by 
specifying some configuration value?

Thanks.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: javac - No such file or directory

2016-11-09 Thread Sonal Goyal
It looks to be an issue with the java compiler, is the jdk setup correctly?
Please check your java installation.

Thanks,
Sonal
Nube Technologies 





On Wed, Nov 9, 2016 at 7:13 PM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> I'm getting this error trying to build spark on Centos7. It is not
> googling very well:
>
> [error] (tags/compile:compileIncremental) java.io.IOException: Cannot run
> program 
> "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac"
> (in directory "/home/spark/spark"): error=2, No such file or directory
> Any ideas?
>
> Thanks,
>
> Andrew
>
>
>
> Full output:
>
> [success] created output: /home/spark/spark/external/kafka-0-10/target
>
> [info] Compiling 2 Scala sources and 8 Java sources to
> /home/spark/spark/common/tags/target/scala-2.11/classes...
>
> java.io.IOException: Cannot run program "/usr/lib/jvm/java-1.8.0-
> openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac" (in directory
> "/home/spark/spark"): error=2, No such file or directory
>
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>
> at sbt.SimpleProcessBuilder.run(ProcessImpl.scala:349)
>
> at sbt.AbstractProcessBuilder.run(ProcessImpl.scala:128)
>
> at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(
> ProcessImpl.scala:159)
>
> at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(
> ProcessImpl.scala:159)
>
> at sbt.compiler.javac.JavacLogger.buffer(JavacProcessLogger.scala:31)
>
> at sbt.AbstractProcessBuilder.runBuffered(ProcessImpl.scala:159)
>
> at sbt.AbstractProcessBuilder.$bang(ProcessImpl.scala:156)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(
> ForkedJava.scala:24)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(
> ForkedJava.scala:17)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.
> apply(ForkedJava.scala:47)
>
> at sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.
> apply(ForkedJava.scala:44)
>
> at sbt.IO$.withTemporaryDirectory(IO.scala:344)
>
> at sbt.compiler.javac.ForkedJava$.withArgumentFile(ForkedJava.scala:44)
>
> at sbt.compiler.javac.ForkedJava$.launch(ForkedJava.scala:17)
>
> at sbt.compiler.javac.ForkedJavaCompiler.run(ForkedJava.scala:68)
>
> at sbt.compiler.javac.JavaCompilerAdapter.compileWithReporter(
> JavaCompilerAdapter.scala:31)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply$mcV$
> sp(AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(
> AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(
> AnalyzingJavaCompiler.scala:65)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler.timed(
> AnalyzingJavaCompiler.scala:93)
>
> at sbt.compiler.javac.AnalyzingJavaCompiler.compile(
> AnalyzingJavaCompiler.scala:64)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply$
> mcV$sp(MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(
> MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(
> MixedAnalyzingCompiler.scala:60)
>
> at sbt.compiler.MixedAnalyzingCompiler.timed(MixedAnalyzingCompiler.scala:
> 74)
>
> at sbt.compiler.MixedAnalyzingCompiler.compileJava$1(
> MixedAnalyzingCompiler.scala:59)
>
> at sbt.compiler.MixedAnalyzingCompiler.compile(
> MixedAnalyzingCompiler.scala:64)
>
> at sbt.compiler.IC$$anonfun$compileInternal$1.apply(
> IncrementalCompiler.scala:160)
>
> at sbt.compiler.IC$$anonfun$compileInternal$1.apply(
> IncrementalCompiler.scala:160)
>
> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:66)
>
> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:64)
>
> at sbt.inc.IncrementalCommon.cycle(IncrementalCommon.scala:32)
>
> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:68)
>
> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:67)
>
> at sbt.inc.Incremental$.manageClassfiles(Incremental.scala:95)
>
> at sbt.inc.Incremental$.compile(Incremental.scala:67)
>
> at sbt.inc.IncrementalCompile$.apply(Compile.scala:54)
>
> at sbt.compiler.IC$.compileInternal(IncrementalCompiler.scala:160)
>
> at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)
>
> at sbt.Compiler$.compile(Compiler.scala:152)
>
> at sbt.Compiler$.compile(Compiler.scala:138)
>
> at sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(
> Defaults.scala:860)
>
> at sbt.Defaults$$anonfun$compileIncrementalTask$1.
> apply(Defaults.scala:851)
>
> at sbt.Defaults$$anonfun$compileIncrementalTask$1.
> apply(Defaults.scala:849)
>
> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>
> at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
>
> at sbt.std.Transform$$anon$4.work(System.scala:63)
>
> at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
>
> at 

importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread Mich Talebzadeh
Hi,

I am exploring the idea of flexibility with importing multiple RDBMS tables
using Informatica that customer has into HDFS.

I don't want to use connectivity tools from Informatica to Hive etc.

So this is what I have in mind


   1. If possible get the tables data out using Informatica and use
   Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
   Informatica do it?) I guess yes
   2. Put the flat files on an edge where HDFS node can see them.
   3. Assuming that a directory can be created by Informatica daily,
   periodically run a cron that ingest that data from directories into HDFS
   equivalent daily directories
   4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
   data

The problem I have is to see if someone has done such thing before.
Specifically can Informatica create target flat files on normal directories.

Any other generic alternative?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


javac - No such file or directory

2016-11-09 Thread Andrew Holway
I'm getting this error trying to build spark on Centos7. It is not googling
very well:

[error] (tags/compile:compileIncremental) java.io.IOException: Cannot run
program
"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac"
(in directory "/home/spark/spark"): error=2, No such file or directory
Any ideas?

Thanks,

Andrew



Full output:

[success] created output: /home/spark/spark/external/kafka-0-10/target

[info] Compiling 2 Scala sources and 8 Java sources to
/home/spark/spark/common/tags/target/scala-2.11/classes...

java.io.IOException: Cannot run program
"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64/bin/javac"
(in directory "/home/spark/spark"): error=2, No such file or directory

at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)

at sbt.SimpleProcessBuilder.run(ProcessImpl.scala:349)

at sbt.AbstractProcessBuilder.run(ProcessImpl.scala:128)

at
sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:159)

at
sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:159)

at sbt.compiler.javac.JavacLogger.buffer(JavacProcessLogger.scala:31)

at sbt.AbstractProcessBuilder.runBuffered(ProcessImpl.scala:159)

at sbt.AbstractProcessBuilder.$bang(ProcessImpl.scala:156)

at
sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(ForkedJava.scala:24)

at
sbt.compiler.javac.ForkedJava$$anonfun$launch$1.apply(ForkedJava.scala:17)

at
sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.apply(ForkedJava.scala:47)

at
sbt.compiler.javac.ForkedJava$$anonfun$withArgumentFile$1.apply(ForkedJava.scala:44)

at sbt.IO$.withTemporaryDirectory(IO.scala:344)

at sbt.compiler.javac.ForkedJava$.withArgumentFile(ForkedJava.scala:44)

at sbt.compiler.javac.ForkedJava$.launch(ForkedJava.scala:17)

at sbt.compiler.javac.ForkedJavaCompiler.run(ForkedJava.scala:68)

at
sbt.compiler.javac.JavaCompilerAdapter.compileWithReporter(JavaCompilerAdapter.scala:31)

at
sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply$mcV$sp(AnalyzingJavaCompiler.scala:65)

at
sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(AnalyzingJavaCompiler.scala:65)

at
sbt.compiler.javac.AnalyzingJavaCompiler$$anonfun$compile$1.apply(AnalyzingJavaCompiler.scala:65)

at
sbt.compiler.javac.AnalyzingJavaCompiler.timed(AnalyzingJavaCompiler.scala:93)

at
sbt.compiler.javac.AnalyzingJavaCompiler.compile(AnalyzingJavaCompiler.scala:64)

at
sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply$mcV$sp(MixedAnalyzingCompiler.scala:60)

at
sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(MixedAnalyzingCompiler.scala:60)

at
sbt.compiler.MixedAnalyzingCompiler$$anonfun$compileJava$1$1.apply(MixedAnalyzingCompiler.scala:60)

at
sbt.compiler.MixedAnalyzingCompiler.timed(MixedAnalyzingCompiler.scala:74)

at
sbt.compiler.MixedAnalyzingCompiler.compileJava$1(MixedAnalyzingCompiler.scala:59)

at
sbt.compiler.MixedAnalyzingCompiler.compile(MixedAnalyzingCompiler.scala:64)

at
sbt.compiler.IC$$anonfun$compileInternal$1.apply(IncrementalCompiler.scala:160)

at
sbt.compiler.IC$$anonfun$compileInternal$1.apply(IncrementalCompiler.scala:160)

at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:66)

at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:64)

at sbt.inc.IncrementalCommon.cycle(IncrementalCommon.scala:32)

at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:68)

at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:67)

at sbt.inc.Incremental$.manageClassfiles(Incremental.scala:95)

at sbt.inc.Incremental$.compile(Incremental.scala:67)

at sbt.inc.IncrementalCompile$.apply(Compile.scala:54)

at sbt.compiler.IC$.compileInternal(IncrementalCompiler.scala:160)

at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)

at sbt.Compiler$.compile(Compiler.scala:152)

at sbt.Compiler$.compile(Compiler.scala:138)

at
sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(Defaults.scala:860)

at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:851)

at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:849)

at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)

at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)

at sbt.std.Transform$$anon$4.work(System.scala:63)

at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)

at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)

at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)

at sbt.Execute.work(Execute.scala:237)

at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)

at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)

at
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)

at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


Physical plan for windows and joins - how to know which is faster?

2016-11-09 Thread Jacek Laskowski
Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
  +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   : +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
  +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
 +- Exchange hashpartitioning(ID % 3#681, 200)
+- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
   +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
  +- *Filter isnotnull((_1#12 % 3))
 +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-09 Thread Carlo . Allocca
Hi Masood,

Thanks for the answer.
Sure. I will do as suggested.

Many Thanks,
Best Regards,
Carlo
On 8 Nov 2016, at 17:19, Masood Krohy 
> wrote:

labels

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Application config management

2016-11-09 Thread Erwan ALLAIN
Hi everyone,

I d like to know what kind of configuration mechanism is used in general ?

Below is what I m going to implement but I d like to know if there is any
"standard way"

1) put configuration in hdfs
2) specify extrajavaoptions (driver and worker) with the hdfs url (
hdfs://ip:port/config)
3) in application code,
 - concatenate hdfs path + config file
 - download file
 - load with typesafe config

What do you think ?


Re: installing spark-jobserver on cdh 5.7 and yarn

2016-11-09 Thread Noorul Islam K M
Reza zade  writes:

> Hi
>
> I have set up a cloudera cluster and work with spark. I want to install
> spark-jobserver on it. What should I do?

Maybe you should send this to spark-jobserver mailing list.

https://github.com/spark-jobserver/spark-jobserver#contact

Thanks and Regards
Noorul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



installing spark-jobserver on cdh 5.7 and yarn

2016-11-09 Thread Reza zade
Hi

I have set up a cloudera cluster and work with spark. I want to install
spark-jobserver on it. What should I do?


Spark streaming delays spikes

2016-11-09 Thread Shlomi.b
Hi,
We are using spark streaming version 1.6.2 and came across a weird behavior.
Our system pulls log events data from flume servers, enrich the events and
save them to ES.
We are using window interval of 15 seconds and the rate on peak hours is
around 70K events.

The average time to process the data and index it to es for a window
interval, takes about 12 seconds, but we see that every 4-5 window intervals
we have a peak to 18-22 seconds.

Looking at the spark UI we see a strange behavior.
Most of the time it shows that every executor has indexed a few thousands
records to ES, and the size is around 5M, and when the peak interval
happens, we see that 2 jobs were created to index data to es, where the
second job took 6-9 seconds to index 1 record of 1800M~.

2 points I would like to clarify:
1.All of our original events are of size 3KB -5KB.
2.When changing the application to save the rdd as text file, (of course, it
took less time than es)
we see the same weird behavior and peak every 4-5 windows intervals.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-delays-spikes-tp28052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org