Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Hemanth Gudela
Yeah, installing HDFS in our environment is unfornutately going to take lot of 
time (approvals/planning etc). I will have to live with local FS for now.
The other option I had already tried is collect() and send everything to driver 
node. But my data volume is too huge for driver node to handle alone.

I’m now trying to split the data into multiple datasets, then collect 
individual dataset and write it to local FS on driver node (this approach slows 
down the spark job, but I hope it works).

Thank you,
Hemanth

From: Femi Anthony <femib...@gmail.com>
Date: Thursday, 10 August 2017 at 11.24
To: Hemanth Gudela <hemanth.gud...@qvantel.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Also, why are you trying to write results locally if you're not using a 
distributed file system ? Spark is geared towards writing to a distributed file 
system. I would suggest trying to collect() so the data is sent to the master 
and then do a write if the result set isn't too big, or repartition before 
trying to write (though I suspect this won't really help). You really should 
install HDFS if that is possible.

Sent from my iPhone

On Aug 10, 2017, at 3:58 AM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Thanks for reply Femi!

I’m writing the file like this --> 
myDataFrame.write.mode("overwrite").csv("myFilePath")
There absolutely are no errors/warnings after the write.

_SUCCESS file is created on master node, but the problem of _temporary is 
noticed only on worked nodes.

I know spark.write.csv works best with HDFS, but with the current setup I have 
in my environment, I have to deal with spark write to node’s local file system 
and not to HDFS.

Regards,
Hemanth

From: Femi Anthony <femib...@gmail.com<mailto:femib...@gmail.com>>
Date: Thursday, 10 August 2017 at 10.38
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Normally the _temporary directory gets deleted as part of the cleanup when the 
write is complete and a SUCCESS file is created. I suspect that the writes are 
not properly completed. How are you specifying the write ? Any error messages 
in the logs ?

On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Hi,

I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
files to node’s local path (not HDFS).
I’m spark.write.csv to write CSV files.

On master node:
spark.write.csv creates a folder with csv file name and writes many files with 
part-r-000n suffix. This is okay for me, I can merge them later.
But on worker nodes:
spark.write.csv creates a folder with csv file name and writes 
many folders and files under _temporary/0/. This is not okay for me.
Could someone please suggest me what could have been going wrong in my 
settings/how to be able to write csv files to the specified folder, and not to 
subfolders (_temporary/0/task_xxx) in worker machines.

Thank you,
Hemanth




--
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre minds." 
- Albert Einstein.


Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Hemanth Gudela
Yes, I have tried with file:/// and the fullpath, as well as just the full path 
without file:/// prefix.
Spark session has been closed, no luck though ☹

Regards,
Hemanth

From: Femi Anthony <femib...@gmail.com>
Date: Thursday, 10 August 2017 at 11.06
To: Hemanth Gudela <hemanth.gud...@qvantel.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Is your filePath prefaced with file:/// and the full path or is it relative ?

You might also try calling close() on the Spark context or session the end of 
the program execution to try and ensure that cleanup is completed

Sent from my iPhone

On Aug 10, 2017, at 3:58 AM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Thanks for reply Femi!

I’m writing the file like this --> 
myDataFrame.write.mode("overwrite").csv("myFilePath")
There absolutely are no errors/warnings after the write.

_SUCCESS file is created on master node, but the problem of _temporary is 
noticed only on worked nodes.

I know spark.write.csv works best with HDFS, but with the current setup I have 
in my environment, I have to deal with spark write to node’s local file system 
and not to HDFS.

Regards,
Hemanth

From: Femi Anthony <femib...@gmail.com<mailto:femib...@gmail.com>>
Date: Thursday, 10 August 2017 at 10.38
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Normally the _temporary directory gets deleted as part of the cleanup when the 
write is complete and a SUCCESS file is created. I suspect that the writes are 
not properly completed. How are you specifying the write ? Any error messages 
in the logs ?

On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Hi,

I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
files to node’s local path (not HDFS).
I’m spark.write.csv to write CSV files.

On master node:
spark.write.csv creates a folder with csv file name and writes many files with 
part-r-000n suffix. This is okay for me, I can merge them later.
But on worker nodes:
spark.write.csv creates a folder with csv file name and writes 
many folders and files under _temporary/0/. This is not okay for me.
Could someone please suggest me what could have been going wrong in my 
settings/how to be able to write csv files to the specified folder, and not to 
subfolders (_temporary/0/task_xxx) in worker machines.

Thank you,
Hemanth




--
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre minds." 
- Albert Einstein.


Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Hemanth Gudela
Thanks for reply Femi!

I’m writing the file like this --> 
myDataFrame.write.mode("overwrite").csv("myFilePath")
There absolutely are no errors/warnings after the write.

_SUCCESS file is created on master node, but the problem of _temporary is 
noticed only on worked nodes.

I know spark.write.csv works best with HDFS, but with the current setup I have 
in my environment, I have to deal with spark write to node’s local file system 
and not to HDFS.

Regards,
Hemanth

From: Femi Anthony <femib...@gmail.com>
Date: Thursday, 10 August 2017 at 10.38
To: Hemanth Gudela <hemanth.gud...@qvantel.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Normally the _temporary directory gets deleted as part of the cleanup when the 
write is complete and a SUCCESS file is created. I suspect that the writes are 
not properly completed. How are you specifying the write ? Any error messages 
in the logs ?

On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Hi,

I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
files to node’s local path (not HDFS).
I’m spark.write.csv to write CSV files.

On master node:
spark.write.csv creates a folder with csv file name and writes many files with 
part-r-000n suffix. This is okay for me, I can merge them later.
But on worker nodes:
spark.write.csv creates a folder with csv file name and writes 
many folders and files under _temporary/0/. This is not okay for me.
Could someone please suggest me what could have been going wrong in my 
settings/how to be able to write csv files to the specified folder, and not to 
subfolders (_temporary/0/task_xxx) in worker machines.

Thank you,
Hemanth




--
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre minds." 
- Albert Einstein.


spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Hemanth Gudela
Hi,

I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
files to node’s local path (not HDFS).
I’m spark.write.csv to write CSV files.

On master node:
spark.write.csv creates a folder with csv file name and writes many files with 
part-r-000n suffix. This is okay for me, I can merge them later.
But on worker nodes:
spark.write.csv creates a folder with csv file name and writes 
many folders and files under _temporary/0/. This is not okay for me.
Could someone please suggest me what could have been going wrong in my 
settings/how to be able to write csv files to the specified folder, and not to 
subfolders (_temporary/0/task_xxx) in worker machines.

Thank you,
Hemanth



Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-24 Thread Hemanth Gudela
Hello Gene,

Thanks, but Alluxio did not solve my spark streaming use case because my source 
parquet files in Alluxio in-memory are not ”appended” but are periodically 
being ”overwritten” due to the nature of business need.
Spark jobs fail when trying to read parquet files at the same time when other 
job is writing parquet files in Alluxio.

Could you suggest a way to synchronize parquet reads and writes in Allxio 
in-memory. i.e. when one spark job is writing a dataframe as parquet file in 
alluxio in-memory, the other spark jobs trying to read must wait until the 
write is finished.

Thanks,
Hemanth

From: Gene Pang <gene.p...@gmail.com>
Date: Monday, 24 April 2017 at 16.41
To: vincent gromakowski <vincent.gromakow...@gmail.com>
Cc: Hemanth Gudela <hemanth.gud...@qvantel.com>, "user@spark.apache.org" 
<user@spark.apache.org>, Felix Cheung <felixcheun...@hotmail.com>
Subject: Re: Spark SQL - Global Temporary View is not behaving as expected

As Vincent mentioned, Alluxio helps with sharing data across different Spark 
contexts. This blog post about Spark dataframes and Alluxio discusses that use 
case<https://alluxio.com/blog/effective-spark-dataframes-with-alluxio>.

Thanks,
Gene

On Sat, Apr 22, 2017 at 2:14 AM, vincent gromakowski 
<vincent.gromakow...@gmail.com<mailto:vincent.gromakow...@gmail.com>> wrote:
Look at alluxio for sharing across drivers or spark jobserver

Le 22 avr. 2017 10:24 AM, "Hemanth Gudela" 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> a écrit :
Thanks for your reply.

Creating a table is an option, but such approach slows down reads & writes for 
a real-time analytics streaming use case that I’m currently working on.
If at all global temporary view could have been accessible across 
sessions/spark contexts, that would have simplified my usecase a lot.

But yeah, thanks for explaining the behavior of global temporary view, now it’s 
clear ☺

-Hemanth

From: Felix Cheung <felixcheun...@hotmail.com<mailto:felixcheun...@hotmail.com>>
Date: Saturday, 22 April 2017 at 11.05
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark SQL - Global Temporary View is not behaving as expected

Cross session is this context is multiple spark sessions from the same spark 
context. Since you are running two shells, you are having different spark 
context.

Do you have to you a temp view? Could you create a table?

_
From: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Sent: Saturday, April 22, 2017 12:57 AM
Subject: Spark SQL - Global Temporary View is not behaving as expected
To: <user@spark.apache.org<mailto:user@spark.apache.org>>


Hi,

According to 
documentation<http://spark.apache.org/docs/latest/sql-programming-guide.html#global-temporary-view>,
 global temporary views are cross-session accessible.

But when I try to query a global temporary view from another spark shell like 
this-->
Instance 1 of spark-shell
--
scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")

Instance 2 of spark-shell (while Instance 1 of spark-shell is still alive)
-
scala> spark.sql("select * from global_temp.gView1").show()
org.apache.spark.sql.AnalysisException: Table or view not found: 
`global_temp`.`gView1`
'Project [*]
+- 'UnresolvedRelation `global_temp`.`gView1`

I am expecting that global temporary view created in shell 1 should be 
accessible in shell 2, but it isn’t!
Please correct me if I missing something here.

Thanks (in advance),
Hemanth




Re: How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Hemanth Gudela
Hi,

One option to use if you can is to force df to use the schema order you prefer 
like this.

DataFrame df = 
sqlContext.read().json(jsonPath).select("name","salary","occupation","address")

-Hemanth

From: Devender Yadav 
Date: Monday, 24 April 2017 at 15.45
To: "user@spark.apache.org" 
Subject: How to maintain order of key-value in DataFrame same as JSON?


{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}


Spark registered view in "Future" - View changes updated in "Future" are lost in main thread

2017-04-24 Thread Hemanth Gudela
Hi,

I’m trying to write a background thread using “Future” which would periodically 
re-register a view with latest data from underlying database table.
However, the data changes updated in “Future” thread are lost in main thread.

In the below code,

1.   In the beginning, registered view “myView” has only 1 row (1, ‘a’), 
that is shown as first output

+---+-+

|id |value|

+---+-+

|1  |a|

+---+-+

2.   After a minute, a background “Future” thread inserts a new row (1, 
‘b’) in the database, and then re-registers “myView” with latest updates from 
underlying table.

a.   The second output clearly shows that “myView” in the “Future” has 2 
rows
  +---+-+

  |id |value|

  +---+-+

  |1  |a|

  |2  |b|

  +---+-+



3.   After 2 minutes, when I query “myView” in the main thread, it doesn’t 
show newly added row (1, ‘b”) even though “myView” has picked up the changes in 
“Future” thread. As you can observe, the third output shows only one row (1, 
‘a’) again!

+---+-+

|id |value|

+---+-+

|1  |a|

+---+-+

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val url = "jdbc:mysql://localhost:3306/myDb?sessionVariables=sql_mode='ANSI'" 
//mysql jdbc url

//set database properties
val dbProperties = new java.util.Properties
dbProperties.setProperty("user","myUser")
dbProperties.setProperty("password","myPass")

//read a database table, register a temp view and cache it
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") 
 //register a temp view named "myView"
spark.sql("cache table myView")
spark.sql("select * from myView").show //in the beginning, myView has just 1 
record
/*
+---+-+
|id |value|
+---+-+
|1  |a|
+---+-+
*/

Future { //in a background thread, insert a new row in database table, 
re-register temp view and refresh the cache
  Thread.sleep(1000*60) //(not necessary but) wait for a minute

  spark.sql("select 2 as id, 'b' as 
value").write.mode("append").jdbc(url,"myView",dbProperties)
  
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") 
//re-register "myView"
  spark.sql("cache table myView")//refresh cache of myView again
  spark.sql("select * from myView").show  //myView now has 2 records
  /*
  +---+-+
  |id |value|
  +---+-+
  |1  |a|
  |2  |b|
  +---+-+
  */
}
Thread.sleep(1000*60*2) //wait for 2 minutes
spark.sql("select * from myView").show //Why is myView having only 1 record!?!
/*
+---+-+
|id |value|
+---+-+
|1  |a|
+---+-+
*/

I have assumed that a temp view registered in “Future” thread is thread local, 
but that doesn’t seem to be the case always.
When the data source is a database table, the data changes updated in a 
registered view “Future” are lost in main thread. However, when the data source 
is parquet, the changes updated in a registered view sustain even in the main 
thread.

Could you please throw some light on what’s happening in the behavior of 
registered view when the data source is database, and why the behavior is 
different when data source is parquet.

Thank you (in advance ☺)
Hemanth





Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-22 Thread Hemanth Gudela
Thanks for your reply.

Creating a table is an option, but such approach slows down reads & writes for 
a real-time analytics streaming use case that I’m currently working on.
If at all global temporary view could have been accessible across 
sessions/spark contexts, that would have simplified my usecase a lot.

But yeah, thanks for explaining the behavior of global temporary view, now it’s 
clear ☺

-Hemanth

From: Felix Cheung <felixcheun...@hotmail.com>
Date: Saturday, 22 April 2017 at 11.05
To: Hemanth Gudela <hemanth.gud...@qvantel.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: Spark SQL - Global Temporary View is not behaving as expected

Cross session is this context is multiple spark sessions from the same spark 
context. Since you are running two shells, you are having different spark 
context.

Do you have to you a temp view? Could you create a table?

_________
From: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Sent: Saturday, April 22, 2017 12:57 AM
Subject: Spark SQL - Global Temporary View is not behaving as expected
To: <user@spark.apache.org<mailto:user@spark.apache.org>>



Hi,

According to 
documentation<http://spark.apache.org/docs/latest/sql-programming-guide.html#global-temporary-view>,
 global temporary views are cross-session accessible.

But when I try to query a global temporary view from another spark shell like 
this-->
Instance 1 of spark-shell
--
scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")

Instance 2 of spark-shell (while Instance 1 of spark-shell is still alive)
-
scala> spark.sql("select * from global_temp.gView1").show()
org.apache.spark.sql.AnalysisException: Table or view not found: 
`global_temp`.`gView1`
'Project [*]
+- 'UnresolvedRelation `global_temp`.`gView1`

I am expecting that global temporary view created in shell 1 should be 
accessible in shell 2, but it isn’t!
Please correct me if I missing something here.

Thanks (in advance),
Hemanth



Spark SQL - Global Temporary View is not behaving as expected

2017-04-22 Thread Hemanth Gudela
Hi,

According to 
documentation,
 global temporary views are cross-session accessible.

But when I try to query a global temporary view from another spark shell like 
this -->
Instance 1 of spark-shell
--
scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")

Instance 2 of spark-shell (while Instance 1 of spark-shell is still alive)
-
scala> spark.sql("select * from global_temp.gView1").show()
org.apache.spark.sql.AnalysisException: Table or view not found: 
`global_temp`.`gView1`
'Project [*]
+- 'UnresolvedRelation `global_temp`.`gView1`

I am expecting that global temporary view created in shell 1 should be 
accessible in shell 2, but it isn’t!
Please correct me if I missing something here.

Thanks (in advance),
Hemanth


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-22 Thread Hemanth Gudela
Thank you Georg, Gene for your ideas.
For now, I am using ”Futures” to asynchronously run a background thread that 
periodically creates a new dataframe fetching latest data from underlying 
table, and re-registers temp view with the same name as used by main thread’s 
static dataframe.

This looks to be working for me now, but if this solution leads to other 
problems, I will look for persisted views in hive / Alluxio.

Regards,
Hemanth

From: Gene Pang <gene.p...@gmail.com>
Date: Saturday, 22 April 2017 at 0.30
To: Georg Heiler <georg.kf.hei...@gmail.com>
Cc: Hemanth Gudela <hemanth.gud...@qvantel.com>, Tathagata Das 
<tathagata.das1...@gmail.com>, "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

Hi Georg,

Yes, that should be possible with Alluxio. Tachyon was renamed to Alluxio.

This article on how Alluxio is used for a Spark streaming use 
case<https://www.alluxio.com/blog/qunar-performs-real-time-data-analytics-up-to-300x-faster-with-alluxio>
 may be helpful.

Thanks,
Gene

On Fri, Apr 21, 2017 at 8:22 AM, Georg Heiler 
<georg.kf.hei...@gmail.com<mailto:georg.kf.hei...@gmail.com>> wrote:
You could write your views to hive or maybe tachyon.

Is the periodically updated data big?

Hemanth Gudela <hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> 
schrieb am Fr. 21. Apr. 2017 um 16:55:
Being new to spark, I think I need your suggestion again.

#2 you can always define a batch Dataframe and register it as view, and then 
run a background then periodically creates a new Dataframe with updated data 
and re-registers it as a view with the same name

I seem to have misunderstood your statement and tried registering static 
dataframe as a temp view (“myTempView”) using createOrReplaceView in one spark 
session, and tried re-registering another refreshed dataframe as temp view with 
same name (“myTempView”) in another session. However, with this approach, I 
have failed to achieve what I’m aiming for, because views are local to one 
spark session.
From spark 2.1.0 onwards, Global view is a nice feature, but still would not 
solve my problem, because global view cannot be updated.

So after much thinking, I understood that you would have meant to use a 
background running process in the same spark job that would periodically create 
a new dataframe and re-register temp view with same name, within the same spark 
session.
Could you please give me some pointers to documentation on how to create such 
asynchronous background process in spark streaming? Is Scala’s “Futures” the 
way to achieve this?

Thanks,
Hemanth


From: Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>>

Date: Friday, 21 April 2017 at 0.03
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Cc: Georg Heiler <georg.kf.hei...@gmail.com<mailto:georg.kf.hei...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>

Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

Here are couple of ideas.
1. You can set up a Structured Streaming query to update in-memory table.
Look at the memory sink in the programming guide - 
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
So you can query the latest table using a specified table name, and also join 
that table with another stream. However, note that this in-memory table is 
maintained in the driver, and so you have be careful about the size of the 
table.

2. If you cannot define a streaming query in the slow moving due to 
unavailability of connector for your streaming data source, then you can always 
define a batch Dataframe and register it as view, and then run a background 
then periodically creates a new Dataframe with updated data and re-registers it 
as a view with the same name. Any streaming query that joins a streaming 
dataframe with the view will automatically start using the most updated data as 
soon as the view is updated.

Hope this helps.


On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler <georg.kf.hei...@gmail.com<mailto:

Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-21 Thread Hemanth Gudela
Being new to spark, I think I need your suggestion again.

#2 you can always define a batch Dataframe and register it as view, and then 
run a background then periodically creates a new Dataframe with updated data 
and re-registers it as a view with the same name

I seem to have misunderstood your statement and tried registering static 
dataframe as a temp view (“myTempView”) using createOrReplaceView in one spark 
session, and tried re-registering another refreshed dataframe as temp view with 
same name (“myTempView”) in another session. However, with this approach, I 
have failed to achieve what I’m aiming for, because views are local to one 
spark session.
From spark 2.1.0 onwards, Global view is a nice feature, but still would not 
solve my problem, because global view cannot be updated.

So after much thinking, I understood that you would have meant to use a 
background running process in the same spark job that would periodically create 
a new dataframe and re-register temp view with same name, within the same spark 
session.
Could you please give me some pointers to documentation on how to create such 
asynchronous background process in spark streaming? Is Scala’s “Futures” the 
way to achieve this?

Thanks,
Hemanth


From: Tathagata Das <tathagata.das1...@gmail.com>
Date: Friday, 21 April 2017 at 0.03
To: Hemanth Gudela <hemanth.gud...@qvantel.com>
Cc: Georg Heiler <georg.kf.hei...@gmail.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

Here are couple of ideas.
1. You can set up a Structured Streaming query to update in-memory table.
Look at the memory sink in the programming guide - 
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
So you can query the latest table using a specified table name, and also join 
that table with another stream. However, note that this in-memory table is 
maintained in the driver, and so you have be careful about the size of the 
table.

2. If you cannot define a streaming query in the slow moving due to 
unavailability of connector for your streaming data source, then you can always 
define a batch Dataframe and register it as view, and then run a background 
then periodically creates a new Dataframe with updated data and re-registers it 
as a view with the same name. Any streaming query that joins a streaming 
dataframe with the view will automatically start using the most updated data as 
soon as the view is updated.

Hope this helps.


On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler <georg.kf.hei...@gmail.com<mailto:georg.kf.hei...@gmail.com>>
Date: Thursday, 20 April 2017 at 23.11
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

What about treating the static data as a (slow) stream as well?

Hemanth Gudela <hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> 
schrieb am Do., 20. Apr. 2017 um 22:09 Uhr:
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth



Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Idea #2 probably suits my needs better, because

-  Streaming query does not have a source database connector yet

-  My source database table is big, so in-memory table could be huge 
for driver to handle.

Thanks for cool ideas, TD!

Regards,
Hemanth

From: Tathagata Das <tathagata.das1...@gmail.com>
Date: Friday, 21 April 2017 at 0.03
To: Hemanth Gudela <hemanth.gud...@qvantel.com>
Cc: Georg Heiler <georg.kf.hei...@gmail.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

Here are couple of ideas.
1. You can set up a Structured Streaming query to update in-memory table.
Look at the memory sink in the programming guide - 
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
So you can query the latest table using a specified table name, and also join 
that table with another stream. However, note that this in-memory table is 
maintained in the driver, and so you have be careful about the size of the 
table.

2. If you cannot define a streaming query in the slow moving due to 
unavailability of connector for your streaming data source, then you can always 
define a batch Dataframe and register it as view, and then run a background 
then periodically creates a new Dataframe with updated data and re-registers it 
as a view with the same name. Any streaming query that joins a streaming 
dataframe with the view will automatically start using the most updated data as 
soon as the view is updated.

Hope this helps.


On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> wrote:
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler <georg.kf.hei...@gmail.com<mailto:georg.kf.hei...@gmail.com>>
Date: Thursday, 20 April 2017 at 23.11
To: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

What about treating the static data as a (slow) stream as well?

Hemanth Gudela <hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> 
schrieb am Do., 20. Apr. 2017 um 22:09 Uhr:
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth



Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler <georg.kf.hei...@gmail.com>
Date: Thursday, 20 April 2017 at 23.11
To: Hemanth Gudela <hemanth.gud...@qvantel.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

What about treating the static data as a (slow) stream as well?

Hemanth Gudela <hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>> 
schrieb am Do., 20. Apr. 2017 um 22:09 Uhr:
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth


Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth


Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-10 Thread Hemanth Gudela
Many thanks Silvio for the link. That’s exactly what I’m looking for. ☺
However there is no mentioning of checkpoint support for custom “ForeachWriter” 
in structured streaming. I’m going to test that now.

Good question Gary, this is the mentioning in the 
link<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html>.
Often times we want to be able to write output of streams to external databases 
such as MySQL. At the time of writing, the Structured Streaming API does not 
support external databases as sinks; however, when it does, the API option will 
be as simple as .format("jdbc").start("jdbc:mysql/..").
In the meantime, we can use the foreach sink to accomplish this. Let’s create a 
custom JDBC Sink that extends ForeachWriter and implements its methods.

I’m not sure though if jdbc sink feature will be available in upcoming spark 
(2.2.0?) version or not.
It would good to know if someone has information about it.

Thanks,
Hemanth

From: "lucas.g...@gmail.com" <lucas.g...@gmail.com>
Date: Monday, 10 April 2017 at 8.24
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Does spark 2.1.0 structured streaming support jdbc sink?

Interesting, does anyone know if we'll be seeing the JDBC sinks in upcoming 
releases?

Thanks!

Gary Lucas

On 9 April 2017 at 13:52, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
JDBC sink is not in 2.1. You can see here for an example implementation using 
the ForEachWriter sink instead: 
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


From: Hemanth Gudela 
<hemanth.gud...@qvantel.com<mailto:hemanth.gud...@qvantel.com>>
Date: Sunday, April 9, 2017 at 4:30 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Does spark 2.1.0 structured streaming support jdbc sink?

Hello Everyone,
I am new to Spark, especially spark streaming.

I am trying to read an input stream from Kafka, perform windowed aggregations 
in spark using structured streaming, and finally write aggregates to a sink.

-  MySQL as an output sink doesn’t seem to be an option, because this 
block of code throws an error

streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)

ava.lang.UnsupportedOperationException: Data source jdbc does not support 
streamed writing

This is strange because, 
this<http://rxin.github.io/talks/2016-02-18_spark_summit_streaming.pdf> 
document shows that jdbc is supported as an output sink!



-  Parquet doesn’t seem to be an option, because it doesn’t support 
“complete” output mode, but “append” only. As I’m preforming windows 
aggregations in spark streaming, the output mode has to be complete, and cannot 
be “append”


-  Memory and console sinks are good for debugging, but are not 
suitable for production jobs.

So, please correct me if I’m missing something in my code to enable jdbc output 
sink.
If jdbc output sink is not option, please suggest me an alternative output sink 
that suits my needs better.

Or since structured streaming is still ‘alpha’, should I resort to spark 
dstreams to achieve my use case described above.
Please suggest.

Thanks in advance,
Hemanth



Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-09 Thread Hemanth Gudela
Hello Everyone,
I am new to Spark, especially spark streaming.

I am trying to read an input stream from Kafka, perform windowed aggregations 
in spark using structured streaming, and finally write aggregates to a sink.

-  MySQL as an output sink doesn’t seem to be an option, because this 
block of code throws an error

streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)

ava.lang.UnsupportedOperationException: Data source jdbc does not support 
streamed writing

This is strange because, 
this 
document shows that jdbc is supported as an output sink!



-  Parquet doesn’t seem to be an option, because it doesn’t support 
“complete” output mode, but “append” only. As I’m preforming windows 
aggregations in spark streaming, the output mode has to be complete, and cannot 
be “append”


-  Memory and console sinks are good for debugging, but are not 
suitable for production jobs.

So, please correct me if I’m missing something in my code to enable jdbc output 
sink.
If jdbc output sink is not option, please suggest me an alternative output sink 
that suits my needs better.

Or since structured streaming is still ‘alpha’, should I resort to spark 
dstreams to achieve my use case described above.
Please suggest.

Thanks in advance,
Hemanth


Re: df.count() returns one more count than SELECT COUNT()

2017-04-06 Thread Hemanth Gudela
Nulls are excluded with spark.sql("SELECT count(distinct col) FROM 
Table").show()
I think it is ANSI SQL behaviour.

scala> spark.sql("select distinct count(null)").show(false)
+---+
|count(NULL)|
+---+
|0  |
+---+

scala> spark.sql("select distinct null").count
res1: Long = 1

Regards,
Hemanth

From: Mohamed Nadjib Mami 
Date: Thursday, 6 April 2017 at 20.29
To: "user@spark.apache.org" 
Subject: df.count() returns one more count than SELECT COUNT()

spark.sql("SELECT count(distinct col) FROM Table").show()