Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
I can't reproduce the issue with my simple code:
```scala
spark.streams.addListener(new StreamingQueryListener {
  override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
println(event.progress.id + " is on progress")
println(s"My accu is ${myAcc.value} on query progress")
  }
...
})

def mappingFunc(key: Long, values: Iterator[String], state: 
GroupState[Long]): ... = {
  myAcc.add(1)
  println(s">>> key: $key => state: ${state}")
...
}

val wordCounts = words
  .groupByKey(v => ...)
  .mapGroupsWithState(timeoutConf = 
GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

val query = wordCounts.writeStream
  .outputMode(OutputMode.Update)
...
```

I'm wondering if there were any errors can be found from driver logs? The 
micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:
> .mapGroupsWithState(
> new 
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());

-- 
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V  wrote:

> Giving the code below:
> //accumulators is a class level variable in driver.
> 
>  sparkSession.streams().addListener(new StreamingQueryListener() {
> @Override
> public void onQueryStarted(QueryStartedEvent queryStarted) {
> logger.info("Query started: " + queryStarted.id());
> }
> @Override
> public void onQueryTerminated(QueryTerminatedEvent
> queryTerminated) {
> logger.info("Query terminated: " + queryTerminated.id());
> }
> @Override
> public void onQueryProgress(QueryProgressEvent queryProgress) {
> 
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
> long eventsReceived = 0;
> long eventsExpired = 0;
> long eventSentSuccess = 0;
> try {
> eventsReceived =
> accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> eventsExpired =
> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> eventSentSuccess =
> accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> } catch (MissingKeyException e) {
> logger.error("Accumulator key not found due to
> Exception {}", e.getMessage());
> }
> logger.info("Events Received:{}", eventsReceived);
> logger.info("Events State Expired:{}", eventsExpired);
> logger.info("Events Sent Success:{}", eventSentSuccess);
> logger.info("Query made progress - batchId: {}
> numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> durationMs:{}" ,
> queryProgress.progress().batchId(),
> queryProgress.progress().numInputRows(),
> queryProgress.progress().inputRowsPerSecond(),
> queryProgress.progress().processedRowsPerSecond(),
> queryProgress.progress().durationMs());
> 
> 
> On Thu, May 28, 2020 at 7:04 PM ZHANG Wei  wrote:
> 
> > May I get how the accumulator is accessed in the method
> > `onQueryProgress()`?
> >
> > AFAICT, the accumulator is incremented well. There is a way to verify that
> > in cluster like this:
> > ```
> > // Add the following while loop before invoking awaitTermination
> > while (true) {
> >   println("My acc: " + myAcc.value)
> >   Thread.sleep(5 * 1000)
> > }
> >
> > //query.awaitTermination()
> > ```
> >
> > And the accumulator value updated can be found from driver stdout.
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 17:12:48 +0530
> > Srinivas V  wrote:
> >
> > > yes, I am using stateful structured streaming. Yes similar to what you
> > do.
> > > This is in Java
> > > I do it this way:
> > > Dataset productUpdates = watermarkedDS
> > > .groupByKey(
> > > (MapFunction) event ->
> > > event.getId(), Encoders.STRING())
> > > .mapGroupsWithState(
> > > new
> > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > > Encoders.bean(ModelStateInfo.class),
> > > Encoders.bean(ModelUpdate.class),
> > > GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > StateUpdateTask contains the update method.
> > >
> > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > 

[Apache Spark][Streaming Job][Checkpoint]Spark job failed on Checkpoint recovery with Batch not found error

2020-05-28 Thread taylorwu
Hi,

We have a Spark 2.4 job failed on Checkpoint recovery every few hours with
the following errors (from the Driver Log):

driver spark-kubernetes-driver ERROR 20:38:51 ERROR MicroBatchExecution:
Query impressionUpdate [id = 54614900-4145-4d60-8156-9746ffc13d1f, runId =
3637c2f3-49b6-40c2-b6d0-7edb28361c5d] terminated with error
java.lang.IllegalStateException: batch 946 doesn't exist
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:406)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

And the executor logs show this error:

 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

How should I fix this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



External hive metastore (remote) managed tables

2020-05-28 Thread Debajyoti Roy
Hi, anyone knows the behavior of dropping managed tables in case of
external hive meta store:

Deletion of the data (e.g. from object store) happens from Spark sql or,
the external hive metastore ?

Confused by local mode and remote mode codes.


Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
I am assuming StateUpdateTask is your application specific class. Does it
have 'updateState' method or something? I googled but couldn't find any
documentation about doing it this way. Can you please direct me to some
documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event ->
> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which you've
>> something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
>> this only under 'Stateful Structured Streaming'. In other streaming 
>> applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually I am
>>> getting the values printed in my driver log as well as sent to Grafana. Not
>>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>>> cluster(not local Mac) where I submit from master node. It should work the
>>> same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
 Hmm... how would they go to Graphana if they are not getting computed
 in your code? I am talking about the Application Specific Accumulators. The
 other standard counters such as 'event.progress.inputRowsPerSecond' are
 getting populated correctly!

 On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the
> values are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> No this is not working even if I use LongAccumulator.
>>
>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>
>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>> be atomic or thread safe. I'm wondering if the implementation for
>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>> replace
>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>> LongAccumulator[3]
>>> and test if the StreamingListener and other codes are able to work?
>>>
>>> ---
>>> Cheers,
>>> -z
>>> [1]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>> [2]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>> 
>>> From: Something Something 
>>> Sent: Saturday, May 16, 2020 0:38
>>> To: spark-user
>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>
>>> Can someone from Spark Development team tell me if this
>>> functionality is supported and tested? I've spent a lot of time on this 
>>> but
>>> can't get it to work. Just to add more context, we've our own 
>>> Accumulator
>>> class that extends from AccumulatorV2. In this class we keep track of 
>>> one
>>> or more accumulators. Here's the definition:
>>>
>>>
>>> class CollectionLongAccumulator[T]
>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>
>>> When the job begins we register an instance of this class:
>>>
>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>
>>> Is this working under Structured Streaming?
>>>
>>> I will keep looking for alternate approaches but any help would be
>>> greatly appreciated. Thanks.
>>>
>>>
>>>
>>> On Thu, May 14, 2020 at 2:36 PM Something Something <

Re: Spark dataframe hdfs vs s3

2020-05-28 Thread Kanwaljit Singh
You can’t play much if it is a streaming job. But in case of batch jobs, 
sometimes teams will copy their S3 data to HDFS in prep for the next run :D

From: randy clinton 
Date: Thursday, May 28, 2020 at 5:50 AM
To: Dark Crusader 
Cc: Jörn Franke , user 
Subject: Re: Spark dataframe hdfs vs s3

See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read throughput 
than S3. Thus, given that the S3 is 10x cheaper than HDFS, we find that S3 is 
almost 2x better compared to HDFS on performance per dollar."

https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html


On Wed, May 27, 2020, 9:51 PM Dark Crusader 
mailto:relinquisheddra...@gmail.com>> wrote:
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton, 
mailto:randyclin...@gmail.com>> wrote:
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for spark 
jobs.

For instance, consider how well partitioned your HDFS file is vs the S3 file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader 
mailto:relinquisheddra...@gmail.com>> wrote:
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce the 
issue.

I will also try your suggestion to look into the UI. Can you guide on what I 
should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper 
caching of intermediate data. And maybe hdfs is doing a better job at this. 
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be 
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
mailto:jornfra...@gmail.com>> wrote:
Have you looked in Spark UI why this is the case ?
S3 Reading can take more time - it depends also what s3 url you are using : s3a 
vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can 
also initially load from S3 and store on HDFS and work from there .

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where 
the data is. Depending on what s3 „protocol“ you are using you might be also 
more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a 
dedicated empty bucket and it lacks some ineroperability with other AWS 
services.

Nevertheless, it could be also something else with the code. Can you post an 
example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader 
> mailto:relinquisheddra...@gmail.com>>:
>
>
> Hi all,
>
> I am reading data from hdfs in the form of parquet files (around 3 GB) and 
> running an algorithm from the spark ml library.
>
> If I create the same spark dataframe by reading data from S3, the same 
> algorithm takes considerably more time.
>
> I don't understand why this is happening. Is this a chance occurence or are 
> the spark dataframes created different?
>
> I don't understand how the data store would effect the algorithm performance.
>
> Any help would be appreciated. Thanks a lot.


--
I appreciate your time,

~Randy


Re: CSV parsing issue

2020-05-28 Thread Sean Owen
I don't think so, that data is inherently ambiguous and incorrectly
formatted. If you know something about the structure, maybe you can rewrite
the middle column manually to escape the inner quotes and then reparse.

On Thu, May 28, 2020 at 10:25 AM elango vaidyanathan 
wrote:

> Is there any way I can handle it in code?
>
> Thanks,
> Elango
>
> On Thu, May 28, 2020, 8:52 PM Sean Owen  wrote:
>
>> Your data doesn't escape double-quotes.
>>
>> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan 
>> wrote:
>>
>>>
>>> Hi team,
>>>
>>> I am loading an CSV. One column contains a json value. I am unable to
>>> parse that column properly. Below is the details. Can you please check once?
>>>
>>>
>>>
>>> val df1=spark.read.option("inferSchema","true").
>>> option("header","true").option("quote", "\"")
>>>
>>> .option("escape",
>>> "\"").csv("/FileStore/tables/sample_file_structure.csv")
>>>
>>>
>>>
>>> sample data:
>>>
>>> 
>>>
>>> column1,column2,column3
>>>
>>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>>> "abcdef",   "language" : "en" }",11
>>>
>>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>>> "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
>>> "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
>>> }",13
>>>
>>>
>>>
>>> output:
>>>
>>> ---
>>>
>>> +-++---+
>>>
>>> | column1| column2| column3 |
>>>
>>> +-++---+
>>>
>>> |123456789|"{ "moveId" : "...| "dob" : null|
>>>
>>> |123456789|"{ "moveId" : "...| "dob" : null|
>>>
>>> +-++---+
>>>
>>>
>>>
>>> Thanks,
>>> Elango
>>>
>>


Re: CSV parsing issue

2020-05-28 Thread elango vaidyanathan
Is there any way I can handle it in code?

Thanks,
Elango

On Thu, May 28, 2020, 8:52 PM Sean Owen  wrote:

> Your data doesn't escape double-quotes.
>
> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan 
> wrote:
>
>>
>> Hi team,
>>
>> I am loading an CSV. One column contains a json value. I am unable to
>> parse that column properly. Below is the details. Can you please check once?
>>
>>
>>
>> val df1=spark.read.option("inferSchema","true").
>> option("header","true").option("quote", "\"")
>>
>> .option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv")
>>
>>
>>
>> sample data:
>>
>> 
>>
>> column1,column2,column3
>>
>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>> "abcdef",   "language" : "en" }",11
>>
>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>> "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
>> "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
>> }",13
>>
>>
>>
>> output:
>>
>> ---
>>
>> +-++---+
>>
>> | column1| column2| column3 |
>>
>> +-++---+
>>
>> |123456789|"{ "moveId" : "...| "dob" : null|
>>
>> |123456789|"{ "moveId" : "...| "dob" : null|
>>
>> +-++---+
>>
>>
>>
>> Thanks,
>> Elango
>>
>


Re: CSV parsing issue

2020-05-28 Thread Sean Owen
Your data doesn't escape double-quotes.

On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan 
wrote:

>
> Hi team,
>
> I am loading an CSV. One column contains a json value. I am unable to
> parse that column properly. Below is the details. Can you please check once?
>
>
>
> val df1=spark.read.option("inferSchema","true").
> option("header","true").option("quote", "\"")
>
> .option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv")
>
>
>
> sample data:
>
> 
>
> column1,column2,column3
>
> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
> "abcdef",   "language" : "en" }",11
>
> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
> "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
> "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
> }",13
>
>
>
> output:
>
> ---
>
> +-++---+
>
> | column1| column2| column3 |
>
> +-++---+
>
> |123456789|"{ "moveId" : "...| "dob" : null|
>
> |123456789|"{ "moveId" : "...| "dob" : null|
>
> +-++---+
>
>
>
> Thanks,
> Elango
>


CSV parsing issue

2020-05-28 Thread elango vaidyanathan
Hi team,

I am loading an CSV. One column contains a json value. I am unable to parse
that column properly. Below is the details. Can you please check once?



val df1=spark.read.option("inferSchema","true").
option("header","true").option("quote", "\"")

.option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv")



sample data:



column1,column2,column3

123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
"abcdef",   "language" : "en" }",11

123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" : "ghi,
jkl",   "language" : "en" }",12 123456789,"{   "moveId" : "123456789",
"dob" : null,   "username" : "mno, pqr",   "language" : "en" }",13



output:

---

+-++---+

| column1| column2| column3 |

+-++---+

|123456789|"{ "moveId" : "...| "dob" : null|

|123456789|"{ "moveId" : "...| "dob" : null|

+-++---+



Thanks,
Elango


Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
Giving the code below:
//accumulators is a class level variable in driver.

 sparkSession.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
logger.info("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
logger.info("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {

accumulators.eventsReceived(queryProgress.progress().numInputRows());
long eventsReceived = 0;
long eventsExpired = 0;
long eventSentSuccess = 0;
try {
eventsReceived =
accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
eventsExpired =
accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
eventSentSuccess =
accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
} catch (MissingKeyException e) {
logger.error("Accumulator key not found due to
Exception {}", e.getMessage());
}
logger.info("Events Received:{}", eventsReceived);
logger.info("Events State Expired:{}", eventsExpired);
logger.info("Events Sent Success:{}", eventSentSuccess);
logger.info("Query made progress - batchId: {}
numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
durationMs:{}" ,
queryProgress.progress().batchId(),
queryProgress.progress().numInputRows(),
queryProgress.progress().inputRowsPerSecond(),
queryProgress.progress().processedRowsPerSecond(),
queryProgress.progress().durationMs());


On Thu, May 28, 2020 at 7:04 PM ZHANG Wei  wrote:

> May I get how the accumulator is accessed in the method
> `onQueryProgress()`?
>
> AFAICT, the accumulator is incremented well. There is a way to verify that
> in cluster like this:
> ```
> // Add the following while loop before invoking awaitTermination
> while (true) {
>   println("My acc: " + myAcc.value)
>   Thread.sleep(5 * 1000)
> }
>
> //query.awaitTermination()
> ```
>
> And the accumulator value updated can be found from driver stdout.
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 17:12:48 +0530
> Srinivas V  wrote:
>
> > yes, I am using stateful structured streaming. Yes similar to what you
> do.
> > This is in Java
> > I do it this way:
> > Dataset productUpdates = watermarkedDS
> > .groupByKey(
> > (MapFunction) event ->
> > event.getId(), Encoders.STRING())
> > .mapGroupsWithState(
> > new
> >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> > Encoders.bean(ModelStateInfo.class),
> > Encoders.bean(ModelUpdate.class),
> > GroupStateTimeout.ProcessingTimeTimeout());
> >
> > StateUpdateTask contains the update method.
> >
> > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > mailinglist...@gmail.com> wrote:
> >
> > > Yes, that's exactly how I am creating them.
> > >
> > > Question... Are you using 'Stateful Structured Streaming' in which
> you've
> > > something like this?
> > >
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > updateAcrossEvents
> > >   )
> > >
> > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> experiencing this only under 'Stateful Structured Streaming'. In other
> streaming applications it works as expected.
> > >
> > >
> > >
> > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> wrote:
> > >
> > >> Yes, I am talking about Application specific Accumulators. Actually I
> am
> > >> getting the values printed in my driver log as well as sent to
> Grafana. Not
> > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> yarn
> > >> cluster(not local Mac) where I submit from master node. It should
> work the
> > >> same for cluster mode as well.
> > >> Create accumulators like this:
> > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > >>
> > >>
> > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > >> mailinglist...@gmail.com> wrote:
> > >>
> > >>> Hmm... how would they go to Graphana if they are not getting
> computed in
> > >>> your code? I am talking about the Application Specific Accumulators.
> The
> > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> are
> > >>> getting populated correctly!
> > >>>
> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> wrote:
> > >>>
> >  Hello,
> >  Even for me it comes as 0 when I print in OnQueryProgress. I use
> > 

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
// Add the following while loop before invoking awaitTermination
while (true) {
  println("My acc: " + myAcc.value)
  Thread.sleep(5 * 1000)
}

//query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

-- 
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event ->
> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
> 
> StateUpdateTask contains the update method.
> 
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
> 
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > updateAcrossEvents
> >   )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're 
> > experiencing this only under 'Stateful Structured Streaming'. In other 
> > streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> mailinglist...@gmail.com> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
> >>>
>  Hello,
>  Even for me it comes as 0 when I print in OnQueryProgress. I use
>  LongAccumulator as well. Yes, it prints on my local but not on cluster.
>  But one consolation is that when I send metrics to Graphana, the values
>  are coming there.
> 
>  On Tue, May 26, 2020 at 3:10 AM Something Something <
>  mailinglist...@gmail.com> wrote:
> 
> > No this is not working even if I use LongAccumulator.
> >
> > On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
> >
> >> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >> be atomic or thread safe. I'm wondering if the implementation for
> >> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
> >> replace
> >> CollectionLongAccumulator by CollectionAccumulator[2] or 
> >> LongAccumulator[3]
> >> and test if the StreamingListener and other codes are able to work?
> >>
> >> ---
> >> Cheers,
> >> -z
> >> [1]
> >> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3Dreserved=0
> >> [2]
> >> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3Dreserved=0
> >> [3]
> >> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulatordata=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3Dreserved=0
> >>
> >> 
> >> From: Something Something 
> >> Sent: Saturday, May 16, 

Re: Spark dataframe hdfs vs s3

2020-05-28 Thread randy clinton
See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read
throughput than S3. Thus, *given that the S3 is 10x cheaper than HDFS, we
find that S3 is almost 2x better compared to HDFS on performance per
dollar."*

*https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html
*


On Wed, May 27, 2020, 9:51 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
 Have you looked in Spark UI why this is the case ?
 S3 Reading can take more time - it depends also what s3 url you are
 using : s3a vs s3n vs S3.

 It could help after some calculation to persist in-memory or on HDFS.
 You can also initially load from S3 and store on HDFS and work from there .

 HDFS offers Data locality for the tasks, ie the tasks start on the
 nodes where the data is. Depending on what s3 „protocol“ you are using you
 might be also more punished with performance.

 Try s3a as a protocol (replace all s3n with s3a).

 You can also use s3 url but this requires a special bucket
 configuration, a dedicated empty bucket and it lacks some ineroperability
 with other AWS services.

 Nevertheless, it could be also something else with the code. Can you
 post an example reproducing the issue?

 > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
 relinquisheddra...@gmail.com>:
 >
 > 
 > Hi all,
 >
 > I am reading data from hdfs in the form of parquet files (around 3
 GB) and running an algorithm from the spark ml library.
 >
 > If I create the same spark dataframe by reading data from S3, the
 same algorithm takes considerably more time.
 >
 > I don't understand why this is happening. Is this a chance occurence
 or are the spark dataframes created different?
 >
 > I don't understand how the data store would effect the algorithm
 performance.
 >
 > Any help would be appreciated. Thanks a lot.

>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
yes, I am using stateful structured streaming. Yes similar to what you do.
This is in Java
I do it this way:
Dataset productUpdates = watermarkedDS
.groupByKey(
(MapFunction) event ->
event.getId(), Encoders.STRING())
.mapGroupsWithState(
new
StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
appConfig, accumulators),
Encoders.bean(ModelStateInfo.class),
Encoders.bean(ModelUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <
mailinglist...@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which you've
> something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
> this only under 'Stateful Structured Streaming'. In other streaming 
> applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I am
>> getting the values printed in my driver log as well as sent to Grafana. Not
>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>> cluster(not local Mac) where I submit from master node. It should work the
>> same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting computed in
>>> your code? I am talking about the Application Specific Accumulators. The
>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>> getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>>
 Hello,
 Even for me it comes as 0 when I print in OnQueryProgress. I use
 LongAccumulator as well. Yes, it prints on my local but not on cluster.
 But one consolation is that when I send metrics to Graphana, the values
 are coming there.

 On Tue, May 26, 2020 at 3:10 AM Something Something <
 mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>> be atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>> replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>> LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> 
>> From: Something Something 
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality
>> is supported and tested? I've spent a lot of time on this but can't get 
>> it
>> to work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in
>> the updateAcrossEvents method but they are always 0 when I try to print
>> them in my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>>
>> The 

Re: Regarding Spark 3.0 GA

2020-05-28 Thread ARNAV NEGI SOFTWARE ARCHITECT
Thanks Fabiano. I am building one myself. Will surely use yours as quick
starter.

On Wed, 27 May 2020, 18:00 Gaetano Fabiano, 
wrote:

> I have no idea.
>
> I compiled a docker image that you can find on docker hub and you can do
> some experiments with it composing a cluster.
>
> https://hub.docker.com/r/gaetanofabiano/spark
>
> Let me know if you will have news about release
>
> Regards
>
> Inviato da iPhone
>
> Il giorno 27 mag 2020, alle ore 10:54, ARNAV NEGI SOFTWARE ARCHITECT <
> negi.ar...@gmail.com> ha scritto:
>
> 
> Hi,
>
> I am working on Spark 3.0 preview release for large Spark jobs on
> Kubernetes and preview looks promising.
>
> Can I understand when the Spark 3.0 GA is expected? Definitive dates will
> help us plan our roadmap with Spark 3.0.
>
>
> Arnav Negi / Technical Architect | Web Technology Enthusiast
> negi.ar...@gmail.com / +91-7045018844
>
>
> [image: Twitter]   [image: Facebook]
>   [image: Google +]
>   [image: LinkedIn]
>   [image: Skype]
>   [image: Youtube]
>   [image: Github]
>   [image: Quora]
> 
>
>
>


Re: Regarding Spark 3.0 GA

2020-05-28 Thread ARNAV NEGI SOFTWARE ARCHITECT
Ok, thanks for the update Sean.

Can I also track RC vote?


On Wed, 27 May 2020, 18:12 Sean Owen,  wrote:

> No firm dates; it always depends on RC voting. Another RC is coming soon.
> It is however looking pretty close to done.
>
> On Wed, May 27, 2020 at 3:54 AM ARNAV NEGI SOFTWARE ARCHITECT <
> negi.ar...@gmail.com> wrote:
>
>> Hi,
>>
>> I am working on Spark 3.0 preview release for large Spark jobs on
>> Kubernetes and preview looks promising.
>>
>> Can I understand when the Spark 3.0 GA is expected? Definitive dates will
>> help us plan our roadmap with Spark 3.0.
>>
>>
>> Arnav Negi / Technical Architect | Web Technology Enthusiast
>> negi.ar...@gmail.com / +91-7045018844
>>
>>
>> [image: Twitter]   [image: Facebook]
>>   [image: Google +]
>>   [image: LinkedIn]
>>   [image: Skype]
>>   [image: Youtube]
>>   [image: Github]
>>   [image: Quora]
>> 
>>
>>
>>