Re: Reading CSV with multiLine option invalidates encoding option.

2017-08-15 Thread Han-Cheol Cho
My apologies,

It was a problem of our Hadoop cluster.
When we tested the same code on another cluster (HDP-based), it worked
without any problem.

```scala
## make sjis text
cat a.txt
8月データだけでやってみよう
nkf -W -s a.txt >b.txt
cat b.txt
87n%G!<%?$@$1$G$d$C$F$_$h$&
nkf -s -w b.txt
8月データだけでやってみよう
hdfs dfs -put a.txt b.txt

## YARN mode test
spark.read.option("encoding", "utf-8").csv("a.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "sjis").csv("b.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "utf-8").option("multiLine",
true).csv("a.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "sjis").option("multiLine",
true).csv("b.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+
```

I am still digging the root cause and will share it later :-)

Best wishes,
Han-Choel


On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho  wrote:

> Dear Spark ML members,
>
>
> I experienced a trouble in using "multiLine" option to load CSV data with
> Shift-JIS encoding.
> When option("multiLine", true) is specified, option("encoding",
> "encoding-name") just doesn't work anymore.
>
>
> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
> method doesn't use parser.options.charset at all.
>
> object MultiLineCSVDataSource extends CSVDataSource {
>   override val isSplitable: Boolean = false
>
>   override def readFile(
>   conf: Configuration,
>   file: PartitionedFile,
>   parser: UnivocityParser,
>   schema: StructType): Iterator[InternalRow] = {
> UnivocityParser.parseStream(
>   CodecStreams.createInputStreamWithCloseResource(conf,
> file.filePath),
>   parser.options.headerFlag,
>   parser,
>   schema)
>   }
>   ...
>
> On the other hand, TextInputCSVDataSource.readFile() method uses it:
>
>   override def readFile(
>   conf: Configuration,
>   file: PartitionedFile,
>   parser: UnivocityParser,
>   schema: StructType): Iterator[InternalRow] = {
> val lines = {
>   val linesReader = new HadoopFileLinesReader(file, conf)
>   Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
> linesReader.close()))
>   linesReader.map { line =>
> new String(line.getBytes, 0, line.getLength,
> parser.options.charset)// < charset option is used here.
>   }
> }
>
> val shouldDropHeader = parser.options.headerFlag && file.start == 0
> UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
>   }
>
>
> It seems like a bug.
> Is there anyone who had the same problem before?
>
>
> Best wishes,
> Han-Cheol
>
> --
> ==
> Han-Cheol Cho, Ph.D.
> Data scientist, Data Science Team, Data Laboratory
> NHN Techorus Corp.
>
> Homepage: https://sites.google.com/site/priancho/
> ==
>



-- 
==
Han-Cheol Cho, Ph.D.
Data scientist, Data Science Team, Data Laboratory
NHN Techorus Corp.

Homepage: https://sites.google.com/site/priancho/
==


Reading CSV with multiLine option invalidates encoding option.

2017-08-15 Thread Han-Cheol Cho
Dear Spark ML members,


I experienced a trouble in using "multiLine" option to load CSV data with
Shift-JIS encoding.
When option("multiLine", true) is specified, option("encoding",
"encoding-name") just doesn't work anymore.


In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
method doesn't use parser.options.charset at all.

object MultiLineCSVDataSource extends CSVDataSource {
  override val isSplitable: Boolean = false

  override def readFile(
  conf: Configuration,
  file: PartitionedFile,
  parser: UnivocityParser,
  schema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
  CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
  parser.options.headerFlag,
  parser,
  schema)
  }
  ...

On the other hand, TextInputCSVDataSource.readFile() method uses it:

  override def readFile(
  conf: Configuration,
  file: PartitionedFile,
  parser: UnivocityParser,
  schema: StructType): Iterator[InternalRow] = {
val lines = {
  val linesReader = new HadoopFileLinesReader(file, conf)
  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
linesReader.close()))
  linesReader.map { line =>
new String(line.getBytes, 0, line.getLength,
parser.options.charset)// < charset option is used here.
  }
}

val shouldDropHeader = parser.options.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
  }


It seems like a bug.
Is there anyone who had the same problem before?


Best wishes,
Han-Cheol

-- 
==
Han-Cheol Cho, Ph.D.
Data scientist, Data Science Team, Data Laboratory
NHN Techorus Corp.

Homepage: https://sites.google.com/site/priancho/
==


Re: SPIP: Spark on Kubernetes

2017-08-15 Thread lucas.g...@gmail.com
>From our perspective, we have invested heavily in Kubernetes as our cluster
manager of choice.

We also make quite heavy use of spark.  We've been experimenting with using
these builds (2.1 with pyspark enabled) quite heavily.  Given that we've
already 'paid the price' to operate Kubernetes in AWS it seems rational to
move our jobs over to spark on k8s.  Having this project merged into the
master will significantly ease keeping our Data Munging toolchain primarily
on Spark.


Gary Lucas
Data Ops Team Lead
Unbounce

On 15 August 2017 at 15:52, Andrew Ash  wrote:

> +1 (non-binding)
>
> We're moving large amounts of infrastructure from a combination of open
> source and homegrown cluster management systems to unify on Kubernetes and
> want to bring Spark workloads along with us.
>
> On Tue, Aug 15, 2017 at 2:29 PM, liyinan926  wrote:
>
>> +1 (non-binding)
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-developers
>> -list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Ok thanks

Few more

1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?

The methods are not thread-safe as they may be called from different
threads.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala



2.if I use streamingquerylistner onqueryprogress my understanding is method
will be executed only when the query is in progress so if I refresh data
frame here without restarting  query will it impact application ?

3.should I use unpersist (Boolean) blocking method or async method
unpersist() as the data size is big.

I feel your solution is better as it stops query --> refresh cache -->
starts query if I compromise on little downtime even cached dataframe is
huge .I'm not sure how listener behaves as it's asynchronous, correct me if
I'm wrong.

On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das 
wrote:

> Both works. The asynchronous method with listener will have less of down
> time, just that the first trigger/batch after the asynchronous
> unpersist+persist will probably take longer as it has to reload the data.
>
>
> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep 
> wrote:
>
>> Thanks tathagata das actually I'm planning to something like this
>>
>> activeQuery.stop()
>>
>> //unpersist and persist cached data frame
>>
>> df.unpersist()
>>
>> //read the updated data //data size of df is around 100gb
>>
>> df.persist()
>>
>>  activeQuery = startQuery()
>>
>>
>> the cached data frame size around 100gb ,so the question is this the
>> right place to refresh this huge cached data frame ?
>>
>> I'm also trying to refresh cached data frame in onqueryprogress() method
>> in a class which extends StreamingQuerylistner
>>
>> Would like to know which is the best place to refresh cached data frame
>> and why
>>
>> Thanks again for the below response
>>
>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can do something like this.
>>>
>>>
>>> def startQuery(): StreamingQuery = {
>>>// create your streaming dataframes
>>>// start the query with the same checkpoint directory}
>>>
>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>> while(!stopped) {
>>>
>>>if (activeQuery = null) { // if query not active, start query
>>>  activeQuery = startQuery()
>>>
>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>> restart query
>>>  activeQuery.stop()
>>>  activeQuery = startQuery()
>>>}
>>>
>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>// if there is any error it will throw exception and quit the loop
>>>// otherwise it will keep checking the condition every 100ms}
>>>
>>>
>>>
>>>
>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
>>> wrote:
>>>
 Thanks Michael

 I guess my question is little confusing ..let me try again


 I would like to restart streaming query programmatically while my
 streaming application is running based on a condition and why I want to do
 this

 I want to refresh a cached data frame based on a condition and the best
 way to do this restart streaming query suggested by Tdas below for similar
 problem


 http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

 I do understand that checkpoint if helps in recovery and failures but I
 would like to know "how to restart streaming query programmatically without
 stopping my streaming application"

 In place of query.awaittermination should I need to have an logic to
 restart query? Please suggest


 On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
 mich...@databricks.com> wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <
> purna2prad...@gmail.com> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
Both works. The asynchronous method with listener will have less of down
time, just that the first trigger/batch after the asynchronous
unpersist+persist will probably take longer as it has to reload the data.


On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep 
wrote:

> Thanks tathagata das actually I'm planning to something like this
>
> activeQuery.stop()
>
> //unpersist and persist cached data frame
>
> df.unpersist()
>
> //read the updated data //data size of df is around 100gb
>
> df.persist()
>
>  activeQuery = startQuery()
>
>
> the cached data frame size around 100gb ,so the question is this the right
> place to refresh this huge cached data frame ?
>
> I'm also trying to refresh cached data frame in onqueryprogress() method
> in a class which extends StreamingQuerylistner
>
> Would like to know which is the best place to refresh cached data frame
> and why
>
> Thanks again for the below response
>
> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das 
> wrote:
>
>> You can do something like this.
>>
>>
>> def startQuery(): StreamingQuery = {
>>// create your streaming dataframes
>>// start the query with the same checkpoint directory}
>>
>> // handle to the active queryvar activeQuery: StreamingQuery = null
>> while(!stopped) {
>>
>>if (activeQuery = null) { // if query not active, start query
>>  activeQuery = startQuery()
>>
>>} else if (shouldRestartQuery())  {  // check your condition and 
>> restart query
>>  activeQuery.stop()
>>  activeQuery = startQuery()
>>}
>>
>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>// if there is any error it will throw exception and quit the loop
>>// otherwise it will keep checking the condition every 100ms}
>>
>>
>>
>>
>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
>> wrote:
>>
>>> Thanks Michael
>>>
>>> I guess my question is little confusing ..let me try again
>>>
>>>
>>> I would like to restart streaming query programmatically while my
>>> streaming application is running based on a condition and why I want to do
>>> this
>>>
>>> I want to refresh a cached data frame based on a condition and the best
>>> way to do this restart streaming query suggested by Tdas below for similar
>>> problem
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/
>>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
>>> fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>
>>> I do understand that checkpoint if helps in recovery and failures but I
>>> would like to know "how to restart streaming query programmatically without
>>> stopping my streaming application"
>>>
>>> In place of query.awaittermination should I need to have an logic to
>>> restart query? Please suggest
>>>
>>>
>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
>>> wrote:
>>>
 See https://spark.apache.org/docs/latest/structured-
 streaming-programming-guide.html#recovering-from-failures-
 with-checkpointing

 Though I think that this currently doesn't work with the console sink.

 On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep  wrote:

> Hi,
>
>>
>> I'm trying to restart a streaming query to refresh cached data frame
>>
>> Where and how should I restart streaming query
>>
>
>
> val sparkSes = SparkSession
>
>   .builder
>
>   .config("spark.master", "local")
>
>   .appName("StreamingCahcePoc")
>
>   .getOrCreate()
>
>
>
> import sparkSes.implicits._
>
>
>
> val dataDF = sparkSes.readStream
>
>   .schema(streamSchema)
>
>   .csv("testData")
>
>
>
>
>
>val query = counts.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .start()
>
>
> query.awaittermination()
>
>
>
>>
>>
>>

>>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks tathagata das actually I'm planning to something like this

activeQuery.stop()

//unpersist and persist cached data frame

df.unpersist()

//read the updated data //data size of df is around 100gb

df.persist()

 activeQuery = startQuery()


the cached data frame size around 100gb ,so the question is this the right
place to refresh this huge cached data frame ?

I'm also trying to refresh cached data frame in onqueryprogress() method in
a class which extends StreamingQuerylistner

Would like to know which is the best place to refresh cached data frame and
why

Thanks again for the below response

On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das 
wrote:

> You can do something like this.
>
>
> def startQuery(): StreamingQuery = {
>// create your streaming dataframes
>// start the query with the same checkpoint directory}
>
> // handle to the active queryvar activeQuery: StreamingQuery = null
> while(!stopped) {
>
>if (activeQuery = null) { // if query not active, start query
>  activeQuery = startQuery()
>
>} else if (shouldRestartQuery())  {  // check your condition and 
> restart query
>  activeQuery.stop()
>  activeQuery = startQuery()
>}
>
>activeQuery.awaitTermination(100)   // wait for 100 ms.
>// if there is any error it will throw exception and quit the loop
>// otherwise it will keep checking the condition every 100ms}
>
>
>
>
> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
> wrote:
>
>> Thanks Michael
>>
>> I guess my question is little confusing ..let me try again
>>
>>
>> I would like to restart streaming query programmatically while my
>> streaming application is running based on a condition and why I want to do
>> this
>>
>> I want to refresh a cached data frame based on a condition and the best
>> way to do this restart streaming query suggested by Tdas below for similar
>> problem
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>
>> I do understand that checkpoint if helps in recovery and failures but I
>> would like to know "how to restart streaming query programmatically without
>> stopping my streaming application"
>>
>> In place of query.awaittermination should I need to have an logic to
>> restart query? Please suggest
>>
>>
>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
>> wrote:
>>
>>> See
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Though I think that this currently doesn't work with the console sink.
>>>
>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
>>> wrote:
>>>
 Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


 val sparkSes = SparkSession

   .builder

   .config("spark.master", "local")

   .appName("StreamingCahcePoc")

   .getOrCreate()



 import sparkSes.implicits._



 val dataDF = sparkSes.readStream

   .schema(streamSchema)

   .csv("testData")





val query = counts.writeStream

   .outputMode("complete")

   .format("console")

   .start()


 query.awaittermination()



>
>
>
>>>
>


Hive Metastore open connections even after closing Spark context and session

2017-08-15 Thread Rohit Damkondwar
Hi. I am using Spark for querying Hive followed by transformations. My
Scala app creates multiple Spark Applications. A new spark context (and
session) is created only after closing previous SparkSession and Spark
Context.

However, on stopping sc and spark, somehow connections to Hive Metastore
(Mysql) are not destroyed properly. For every, Spark App I can see around 5
Mysql connections being created (old connections being still active!).
Eventually, Mysql starts rejecting new connections after 150 open
connections. How can I force spark to close Hive metastore connections to
Mysql (after spark.stop() and sc.stop())?

sc = spark context
spark = sparksession


Regards,
Rohit S Damkondwar


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this.


def startQuery(): StreamingQuery = {
   // create your streaming dataframes
   // start the query with the same checkpoint directory}

// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {

   if (activeQuery = null) { // if query not active, start query
 activeQuery = startQuery()

   } else if (shouldRestartQuery())  {  // check your condition
and restart query
 activeQuery.stop()
 activeQuery = startQuery()
   }

   activeQuery.awaitTermination(100)   // wait for 100 ms.
   // if there is any error it will throw exception and quit the loop
   // otherwise it will keep checking the condition every 100ms}




On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
wrote:

> Thanks Michael
>
> I guess my question is little confusing ..let me try again
>
>
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
>
> I want to refresh a cached data frame based on a condition and the best
> way to do this restart streaming query suggested by Tdas below for similar
> problem
>
> http://mail-archives.apache.org/mod_mbox/spark-user/
> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
> fwmn4jtm1x1nd...@mail.gmail.com%3e
>
> I do understand that checkpoint if helps in recovery and failures but I
> would like to know "how to restart streaming query programmatically without
> stopping my streaming application"
>
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
>
>
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
> wrote:
>
>> See https://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#recovering-from-failures-
>> with-checkpointing
>>
>> Though I think that this currently doesn't work with the console sink.
>>
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
>> wrote:
>>
>>> Hi,
>>>

 I'm trying to restart a streaming query to refresh cached data frame

 Where and how should I restart streaming query

>>>
>>>
>>> val sparkSes = SparkSession
>>>
>>>   .builder
>>>
>>>   .config("spark.master", "local")
>>>
>>>   .appName("StreamingCahcePoc")
>>>
>>>   .getOrCreate()
>>>
>>>
>>>
>>> import sparkSes.implicits._
>>>
>>>
>>>
>>> val dataDF = sparkSes.readStream
>>>
>>>   .schema(streamSchema)
>>>
>>>   .csv("testData")
>>>
>>>
>>>
>>>
>>>
>>>val query = counts.writeStream
>>>
>>>   .outputMode("complete")
>>>
>>>   .format("console")
>>>
>>>   .start()
>>>
>>>
>>> query.awaittermination()
>>>
>>>
>>>



>>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks Michael

I guess my question is little confusing ..let me try again


I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this

I want to refresh a cached data frame based on a condition and the best way
to do this restart streaming query suggested by Tdas below for similar
problem

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

I do understand that checkpoint if helps in recovery and failures but I
would like to know "how to restart streaming query programmatically without
stopping my streaming application"

In place of query.awaittermination should I need to have an logic to
restart query? Please suggest


On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>
>>
>>val query = counts.writeStream
>>
>>   .outputMode("complete")
>>
>>   .format("console")
>>
>>   .start()
>>
>>
>> query.awaittermination()
>>
>>
>>
>>>
>>>
>>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Michael Armbrust
See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Though I think that this currently doesn't work with the console sink.

On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
wrote:

> Hi,
>
>>
>> I'm trying to restart a streaming query to refresh cached data frame
>>
>> Where and how should I restart streaming query
>>
>
>
> val sparkSes = SparkSession
>
>   .builder
>
>   .config("spark.master", "local")
>
>   .appName("StreamingCahcePoc")
>
>   .getOrCreate()
>
>
>
> import sparkSes.implicits._
>
>
>
> val dataDF = sparkSes.readStream
>
>   .schema(streamSchema)
>
>   .csv("testData")
>
>
>
>
>
>val query = counts.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .start()
>
>
> query.awaittermination()
>
>
>
>>
>>
>>


Re: Spark 2.2 streaming with append mode: empty output

2017-08-15 Thread Ashwin Raju
The input dataset has multiple days worth of data, so I thought the
watermark should have been crossed. To debug, I changed the query to the
code below. My expectation was that since I am doing 1 day windows with
late arrivals permitted for 1 second, when it sees records for the next
day, it would output a row for the previous day. When i run the code below
with 'complete' output mode, I see the table and then the lastProgress
output about 10 seconds later. When i run it with 'append' mode, the table
has no rows, but the lastProgress output is the same.

Is withWatermark ignored in 'complete' and 'update' modes?

For append mode, I'm not able to understand why the watermark entry is
still at 1970. "For a specific window starting at time T, the engine will
maintain state and allow late data to update the state until (max event
time seen by the engine - late threshold > T)". The table shows that we are
seeing events with timestamp  2017-05-18. At that point the 2017-05-17 window
can be closed and the row output, right?

grouped_df = frame \
.withWatermark("timestamp", "1 second") \
.groupby(F.window("timestamp", "1 day")) \
.agg(F.max("timestamp"))

query = frame.writeStream \

.format("console") \

.option("truncate", False) \

.option("checkpointLocation", CKPT_LOC) \
.outputMode("complete") \
.start()
import time
i = 0
while (i < 10):
time.sleep(10)
print(query.lastProgress)
i += 1
query.awaitTermination()

The output:

+-+---+
|window   |max(timestamp) |
+-+---+
|[2017-05-17 00:00:00.0,2017-05-18 00:00:00.0]|2017-05-17 23:59:59|
|[2017-05-15 00:00:00.0,2017-05-16 00:00:00.0]|2017-05-15 23:59:59|
|[2017-05-14 00:00:00.0,2017-05-15 00:00:00.0]|2017-05-14 23:59:59|
|[2017-05-16 00:00:00.0,2017-05-17 00:00:00.0]|2017-05-16 23:59:59|
|[2017-05-07 00:00:00.0,2017-05-08 00:00:00.0]|2017-05-07 23:59:59|
|[2017-05-19 00:00:00.0,2017-05-20 00:00:00.0]|2017-05-19 23:59:59|
|[2017-05-18 00:00:00.0,2017-05-19 00:00:00.0]|2017-05-18 23:59:59|
|[2017-05-20 00:00:00.0,2017-05-21 00:00:00.0]|2017-05-20 23:59:59|
|[2017-05-08 00:00:00.0,2017-05-09 00:00:00.0]|2017-05-08 23:59:59|
|[2017-05-10 00:00:00.0,2017-05-11 00:00:00.0]|2017-05-10 23:59:59|
|[2017-05-13 00:00:00.0,2017-05-14 00:00:00.0]|2017-05-13 23:59:57|
|[2017-05-21 00:00:00.0,2017-05-22 00:00:00.0]|2017-05-21 23:40:08|
|[2017-05-09 00:00:00.0,2017-05-10 00:00:00.0]|2017-05-09 23:59:59|
|[2017-05-12 00:00:00.0,2017-05-13 00:00:00.0]|2017-05-12 23:40:11|
|[2017-05-11 00:00:00.0,2017-05-12 00:00:00.0]|2017-05-11 23:59:59|
+-+---+{u'stateOperators':
[{u'numRowsTotal': 15, u'numRowsUpdated': 0}],

u'eventTime': {u'watermark': u'1970-01-01T00:00:00.000Z'},

u'name': None, u'timestamp': u'2017-08-15T18:13:54.381Z',

u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0,

u'numInputRows': 0, u'sources':

[{u'description': u'FileStreamSource[hdfs://some_ip/some_path]',

u'endOffset': {u'logOffset': 0}, u'processedRowsPerSecond': 0.0,
u'inputRowsPerSecond': 0.0,

u'numInputRows': 0, u'startOffset': {u'logOffset': 0}}], u'durationMs':

{u'getOffset': 57, u'triggerExecution': 57}, u'runId':
u'a5b75404-c774-49db-aac5-2592211417ca',

u'id': u'35ad86ec-f608-40b5-a48b-9507c82a87c8', u'sink':

{u'description':
u'org.apache.spark.sql.execution.streaming.ConsoleSink@7e4050cd'}}




On Mon, Aug 14, 2017 at 4:55 PM, Tathagata Das 
wrote:

> In append mode, the aggregation outputs a row only when the watermark has
> been crossed and the corresponding aggregate is *final*, that is, will not
> be updated any more.
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#handling-late-data-and-watermarking
>
> On Mon, Aug 14, 2017 at 4:09 PM, Ashwin Raju  wrote:
>
>> Hi,
>>
>> I am running Spark 2.2 and trying out structured streaming. I have the
>> following code:
>>
>> from pyspark.sql import functions as F
>>
>> df=frame \
>>
>> .withWatermark("timestamp","1 minute") \
>>
>> .groupby(F.window("timestamp","1 day"),*groupby_cols) \
>>
>> .agg(f.sum('bytes'))
>>
>> query = frame.writeStream \
>>
>> .format("console")
>>
>> .option("checkpointLocation", '\some\chkpoint')
>>
>> .outputMode("complete")
>>
>> .start()
>>
>>
>>
>> query.awaitTermination()
>>
>>
>>
>> It prints out a bunch of aggregated rows to console. When I run the same
>> query with outputMode("append") however, the output only has the column
>> names, no rows. I was originally trying to output to parquet, which only
>> supports append mode. I was seeing no data in my parquet files, so I
>> switched to console output to debug, then noticed this issue. Am I
>> misunderstanding something about how append mode works?
>>
>>
>> Thanks,
>>
>> Ashwin
>>
>>
>


How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-15 Thread SRK
Hi,

How to force Spark Kafka Direct to start from the latest offset when the lag
is huge in kafka 10? It seems to be processing from the latest offset stored
for a group id. One way to do this is to change the group id. But it would
mean that each time that we need to process the job from the latest offset
we have to provide a new group id.

Is there a way to force the job to run from the latest offset in case we
need to and still use the same group id?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


val sparkSes = SparkSession

  .builder

  .config("spark.master", "local")

  .appName("StreamingCahcePoc")

  .getOrCreate()



import sparkSes.implicits._



val dataDF = sparkSes.readStream

  .schema(streamSchema)

  .csv("testData")





   val query = counts.writeStream

  .outputMode("complete")

  .format("console")

  .start()


query.awaittermination()



>
>
>


Re: DAGScheduler - two runtimes

2017-08-15 Thread 周康
ResultStage cost time is your job's last stage cost time.
Job 13 finished: reduce at VertexRDDImpl.scala:90, took 0.035546 s is the
time your job cost

2017-08-14 18:58 GMT+08:00 Kaepke, Marc :

> Hi everyone,
>
> I’m a Spark newbie and have one question:
> What is the difference between the duration measures in my log/ console
> output?
>
> 17/08/14 12:48:58 INFO DAGScheduler: ResultStage 232 (reduce at
> VertexRDDImpl.scala:90) finished in 0.026 s
> 17/08/14 12:48:58 INFO DAGScheduler: Job 13 finished: reduce at
> VertexRDDImpl.scala:90, took 0.035546 s
>
> I need the total runtime of the job in ms. Or seconds with decimal
>
> Thanks!
>
> Best
> Marc
>
>
>


How to calculating CPU time for a Spark Job?

2017-08-15 Thread 钟文波
How to calculating CPU time for a Spark Job? Is there any interface can be
directly call?

like the hadoop Map-Reduce Framework provider the CPU time spent(ms) in the
Counters.

thinks!