Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Well, the code itself doesn't seem to be OK - you're using
ProductStateInformation as the class of State whereas you provide
ProductSessionInformation to Encoder for State.

On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim 
wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState object with
>> state.get(), it is giving some random values for the fields in the object,
>> some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345', productId='222112345',
>> priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple of
>> days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements MapGroupsWithStateFunction> Event, ProductStateInformation, ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator 
>> eventsIterator, GroupState state) throws Exception {
>> {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //
>>
>> }else{
>>
>> if (state.exists()) {
>> ProductStateInformation oldSession = state.get();
>> System.out.println("State for productId:"+productId + " with old values 
>> "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset dataSet = sparkSession
>> .readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "localhost")
>> .option("startingOffsets","latest")
>> .option("failOnDataLoss", "false")
>> .option("subscribe", "topic1,topic2")
>> .option("includeTimestamp", true)
>>
>> .load();
>>
>>  eventsDS.groupByKey(
>> new MapFunction() {
>> @Override public String call(Event event) {
>> return event.getProductId();
>> }
>> }, Encoders.STRING())
>> .mapGroupsWithState(
>> new StateUpdateTask(3),
>> Encoders.bean(ProductSessionInformation.class),
>> Encoders.bean(ProductSessionUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> ...
>>
>>
>> StreamingQuery query = productUpdates
>> .writeStream()
>> .foreach(new ForeachWriter() {
>> @Override
>> public boolean open(long l, long l1) {return true;}
>>
>> @Override
>> public void process(ProductSessionUpdate productSessionUpdate) {
>> logger.info("-> query process: "+ productSessionUpdate);
>> }
>>
>> @Override
>> public void close(Throwable throwable) {}
>> })
>> .outputMode("update")
>> .option("checkpointLocation", checkpointDir)
>> .start();
>>
>> query.awaitTermination();
>>
>> }
>>
>> public class ProductStateInformation implements Serializable {
>>
>> protected String requestId;
>> protected String productId;
>> protected String priority;
>> protected long firstEventTimeMillis;
>> protected long lastEventTimeMillis;
>> protected long firstReceivedTimeMillis;
>> protected int numberOfEvents;
>>
>> ...//getter setters
>>
>> }
>>
>> These are are the versions I am using:
>>
>> 2.3.1
>> 2.4.3
>>
>> 2.6.60.10.2.0
>>
>> 3.0.3
>>
>>


Re: OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
OK *user support. user@ is DONE !!!*

I reported a work around to an existing bug actually to the experienced
user.
and "the experienced user" was "not aware" of the
setting in the log4j.properties so he learned something new too.
Clearly neither were you.

Also it may surprise some people but  there are people who have been
formally
trained in software development.
We can tell a self trained a mile away.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:

> BCC user, dev, and I encourage others to not reply.
>
> I said _dev@_ is not for user support. user@ is. You heard that
> yesterday, too, and not to cross-post.
> You actually got answers to several questions, despite their tone,
> from experienced developers of the project.
>
> Messages like yours are, I assure you, not useful to _anybody_. If we
> let people talk like this on big community lists, yes _that_ will put
> up barriers.
> So, the answer for you is: you are not using either of these lists
> appropriately right now. If you can keep it civil and on-topic, use
> user@.
> Otherwise we will block you from the lists.
>
>
> Sean
>
> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman  wrote:
> >
> >
> > Sean Owen says the criteria of these two emailing list is not help to
> support some body
> > who is new but for people who have been using the software for a long
> time.
> >
> > He is implying I think that I should only send email when I find bugs so
> that I can help him in his work.
> > A one way street.
> >
> > He is suggesting the more familiar you are with this software the more
> important you are.
> > Some kind of Alpha male type heirachy.
> >
> > He wants to put a barrier in place where Apache foundation wants no
> barriers to free learning and free software.
> >
> > He has not reported any bugs while I have reported so many in such a
> short space of time.
> > He has warned me as well
> >
> > So that Sean Owen does not put a barrier in place for me in my path to
> free learning and free  Apache software
> > I would like somebody to clarify the criteria for me.
> >
> >
> > Backbutton.co.uk
> > ¯\_(ツ)_/¯
> > ♡۶Java♡۶RMI ♡۶
> > Make Use Method {MUM}
> > makeuse.org
>


Re: OFF TOPIC LIST CRITERIA

2020-03-27 Thread Sean Owen
BCC user, dev, and I encourage others to not reply.

I said _dev@_ is not for user support. user@ is. You heard that
yesterday, too, and not to cross-post.
You actually got answers to several questions, despite their tone,
from experienced developers of the project.

Messages like yours are, I assure you, not useful to _anybody_. If we
let people talk like this on big community lists, yes _that_ will put
up barriers.
So, the answer for you is: you are not using either of these lists
appropriately right now. If you can keep it civil and on-topic, use
user@.
Otherwise we will block you from the lists.


Sean

On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman  wrote:
>
>
> Sean Owen says the criteria of these two emailing list is not help to support 
> some body
> who is new but for people who have been using the software for a long time.
>
> He is implying I think that I should only send email when I find bugs so that 
> I can help him in his work.
> A one way street.
>
> He is suggesting the more familiar you are with this software the more 
> important you are.
> Some kind of Alpha male type heirachy.
>
> He wants to put a barrier in place where Apache foundation wants no barriers 
> to free learning and free software.
>
> He has not reported any bugs while I have reported so many in such a short 
> space of time.
> He has warned me as well
>
> So that Sean Owen does not put a barrier in place for me in my path to free 
> learning and free  Apache software
> I would like somebody to clarify the criteria for me.
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
Sean Owen says the criteria of these two emailing list is not help to
support some body
who is new but for people who have been using the software for a long time.

He is implying I think that I should only send email when I find bugs so
that I can help him in his work.
A one way street.

He is suggesting the more familiar you are with this software the more
important you are.
Some kind of Alpha male type heirachy.

He wants to put a barrier in place where Apache foundation wants no
barriers to free learning and free software.

He has not reported any bugs while I have reported so many in such a short
space of time.
He has warned me as well

So that Sean Owen does not put a barrier in place for me in my path to free
learning and free  Apache software
I would like somebody to clarify the criteria for me.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Best Practice: Evaluate Expression from Spark DataFrame Column

2020-03-27 Thread Chetan Khatri
Hi Spark Users,

I want to evaluate expression from dataframe column values on other columns
in the same dataframe for each row. Please suggest best approach to deal
with this given that not impacting the performance of the job.

Thanks

Sample code:

val sampleDF = Seq(
  (8, 1, "bat", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (64, 0, "mouse", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (null, 0, "miki", "NUM IS NOT NULL AND FLAG IS NOT 1 AND WORD IS 'MIKI'")
).toDF("num", "flag", "word", "expression")

val derivedDF = sampleDF.withColumn("status", sampleDF.col("expression"))


Re: what a plava !

2020-03-27 Thread Zahid Rahman
That confirms the three technologies are competing
for the same space as I suspected but wasn't sure.
I can focus on the APIs and not waste any unnecessary time on even
looking at mesos and yarn.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sat, 28 Mar 2020 at 02:07, Sean Owen  wrote:

> Spark standalone is a resource manager like YARN and Mesos. It is
> specific to Spark, and is therefore simpler, as it assumes it can take
> over whole machines.
> YARN and Mesos are for mediating resource usage across applications on
> a cluster, which may be running more than Spark apps.
>
> On Fri, Mar 27, 2020 at 7:30 PM Zahid Rahman  wrote:
> >
> > OK, Thanks.
> >
> > issue of load balancing /Clustering:
> >
> > I believe if I setup clustering like so :
> > sbin/start-master.sh
> > sbin/start-slave spark://master:port
> >
> > another machine
> > sbin/start-slave spark://master:port
> >
> > Does yarn and mesos do anything different than that ?
> >
> > The spark clustering setup and yarn and mesos, are they competing
> technologies  for the same space / functionality ?
> >
>


Re: what a plava !

2020-03-27 Thread Sean Owen
Spark standalone is a resource manager like YARN and Mesos. It is
specific to Spark, and is therefore simpler, as it assumes it can take
over whole machines.
YARN and Mesos are for mediating resource usage across applications on
a cluster, which may be running more than Spark apps.

On Fri, Mar 27, 2020 at 7:30 PM Zahid Rahman  wrote:
>
> OK, Thanks.
>
> issue of load balancing /Clustering:
>
> I believe if I setup clustering like so :
> sbin/start-master.sh
> sbin/start-slave spark://master:port
>
> another machine
> sbin/start-slave spark://master:port
>
> Does yarn and mesos do anything different than that ?
>
> The spark clustering setup and yarn and mesos, are they competing 
> technologies  for the same space / functionality ?
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: what a plava !

2020-03-27 Thread Zahid Rahman
OK, Thanks.

issue of load balancing /Clustering:

I believe if I setup clustering like so :
sbin/start-master.sh
sbin/start-slave spark://master:port

*another machine*
sbin/start-slave spark://master:port

Does yarn and mesos do anything different than that ?

The spark clustering setup and yarn and mesos, are they competing
technologies  for the same space / functionality ?

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 22:38, Sean Owen  wrote:

> - dev@, which is more for project devs to communicate. Cross-posting
> is discouraged too.
>
> The book isn't from the Spark OSS project, so not really the place to
> give feedback here.
>
> I don't quite understand the context of your other questions, but
> would elaborate them in individual, clear emails instead to increase
> the chance that someone will answer.
>
> On Fri, Mar 27, 2020 at 4:49 PM Zahid Rahman  wrote:
> >
> >
> > I was very impressed with the amount of material available from
> https://github.com/databricks/Spark-The-Definitive-Guide/
> > Over 450+  megabytes.
> >
> > I have a corrected the scala code  by adding
> > .sort(desc("sum(total_cost)")) to the code provided on page 34 (see
> below).
> >
> > I have noticed numerous uses of exclamation marks almost over use.
> > for example:
> > page 23: Let's specify some more transformatrions !
> > page 24: you've read your first explain plan !
> > page 26: Notice that these plans compile to the exactsame underlying
> plan !
> > page 29: The last step is our action !
> > page 34: The best thing about structured  streaming rapidly... with
> virtually no code
> >
> > 1. I have never read a science book with such emotion of frustration.
> > Is Spark difficult to understand made more complicated  with the
> proliferation of languages
> > scala , Java , python SQL R.
> >
> > 2. Secondly, Is spark architecture made more complex due to competing
> technologies ?
> >
> > I have spark cluster setup with master and slave to load balancing heavy
> activity like so:
> > sbin/start-master.sh
> > sbin/start-slave.sh spark://192.168.0.38:7077
> > for load balancing I imagine, conceptually speaking,  although I haven't
> tried it , I can have as many
> > slaves(workers)  on other physical machines  by simply downloading spark
> zip file
> > and running workers from those other physical machine(s) with
> sbin/start-slave.sh  spark://192.168.0.38:7077.
> > My question is under the circumstances do I need to bother with mesos or
> yarn ?
> >
> > Collins dictionary
> > The exclamation mark is used after exclamations and emphatic expressions.
> >
> > I can’t believe it!
> > Oh, no! Look at this mess!
> >
> > The exclamation mark loses its effect if it is overused. It is better to
> use a full stop after a sentence expressing mild excitement or humour.
> >
> > It was such a beautiful day.
> > I felt like a perfect banana.
> >
> >
> > import org.apache.spark.sql.SparkSession
> > import org.apache.spark.sql.functions.{window,column,desc,col}
> >
> > object RetailData {
> >
> >   def main(args: Array[String]): Unit = {
> >
> > val spark = 
> > SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
> Data").getOrCreate();
> >
> > // create a static frame
> >   val staticDataFrame = spark.read.format("csv")
> > .option ("header","true")
> > .option("inferschema","true")
> > .load("/data/retail-data/by-day/*.csv")
> >
> > staticDataFrame.createOrReplaceTempView("retail_data")
> > val staticFrame = staticDataFrame.schema
> >
> > staticDataFrame
> >   .selectExpr(
> > "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
> >   .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
> >   .sum("total_cost")
> >   .sort(desc("sum(total_cost)"))
> >   .show(1)
> >
> >   } // main
> >
> > } // object
> >
> >
> >
> > Backbutton.co.uk
> > ¯\_(ツ)_/¯
> > ♡۶Java♡۶RMI ♡۶
> > Make Use Method {MUM}
> > makeuse.org
>


Re: what a plava !

2020-03-27 Thread Sean Owen
- dev@, which is more for project devs to communicate. Cross-posting
is discouraged too.

The book isn't from the Spark OSS project, so not really the place to
give feedback here.

I don't quite understand the context of your other questions, but
would elaborate them in individual, clear emails instead to increase
the chance that someone will answer.

On Fri, Mar 27, 2020 at 4:49 PM Zahid Rahman  wrote:
>
>
> I was very impressed with the amount of material available from 
> https://github.com/databricks/Spark-The-Definitive-Guide/
> Over 450+  megabytes.
>
> I have a corrected the scala code  by adding
> .sort(desc("sum(total_cost)")) to the code provided on page 34 (see below).
>
> I have noticed numerous uses of exclamation marks almost over use.
> for example:
> page 23: Let's specify some more transformatrions !
> page 24: you've read your first explain plan !
> page 26: Notice that these plans compile to the exactsame underlying plan !
> page 29: The last step is our action !
> page 34: The best thing about structured  streaming rapidly... with 
> virtually no code
>
> 1. I have never read a science book with such emotion of frustration.
> Is Spark difficult to understand made more complicated  with the 
> proliferation of languages
> scala , Java , python SQL R.
>
> 2. Secondly, Is spark architecture made more complex due to competing 
> technologies ?
>
> I have spark cluster setup with master and slave to load balancing heavy 
> activity like so:
> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
> for load balancing I imagine, conceptually speaking,  although I haven't 
> tried it , I can have as many
> slaves(workers)  on other physical machines  by simply downloading spark zip 
> file
> and running workers from those other physical machine(s) with  
> sbin/start-slave.sh  spark://192.168.0.38:7077.
> My question is under the circumstances do I need to bother with mesos or yarn 
> ?
>
> Collins dictionary
> The exclamation mark is used after exclamations and emphatic expressions.
>
> I can’t believe it!
> Oh, no! Look at this mess!
>
> The exclamation mark loses its effect if it is overused. It is better to use 
> a full stop after a sentence expressing mild excitement or humour.
>
> It was such a beautiful day.
> I felt like a perfect banana.
>
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.{window,column,desc,col}
>
> object RetailData {
>
>   def main(args: Array[String]): Unit = {
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
> Data").getOrCreate();
>
> // create a static frame
>   val staticDataFrame = spark.read.format("csv")
> .option ("header","true")
> .option("inferschema","true")
> .load("/data/retail-data/by-day/*.csv")
>
> staticDataFrame.createOrReplaceTempView("retail_data")
> val staticFrame = staticDataFrame.schema
>
> staticDataFrame
>   .selectExpr(
> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>   .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>   .sum("total_cost")
>   .sort(desc("sum(total_cost)"))
>   .show(1)
>
>   } // main
>
> } // object
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



what a plava !

2020-03-27 Thread Zahid Rahman
I was very impressed with the amount of material available from
https://github.com/databricks/Spark-The-Definitive-Guide/
Over 450+
*  megabytes.*


I have a corrected the scala code  by adding
*.sort(desc("sum(total_cost)"))* to the code provided on page 34 (see
below).

I have noticed numerous uses of exclamation marks almost over use.
for example:
page 23: Let's specify some more *transformatrions !*
page 24: you've read your first explain *plan !*
page 26: Notice that these plans compile to the exactsame underlying *plan
!*
page 29: The last step is our *action !*
page 34: The best thing about structured  streaming rapidly...
with *virtually
no code *

1. I have never read a science book with such emotion of frustration.
Is Spark difficult to understand made more complicated  with the
proliferation of languages
scala , Java , python SQL R.

2. Secondly, Is spark architecture made more complex due to competing
technologies ?

I have spark cluster setup with master and slave to load balancing heavy
activity like so:
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077
for load balancing I imagine, conceptually speaking,  although I haven't
tried it , I can have as many
slaves(workers)  on other physical machines  by simply downloading spark
zip file
and running workers from those other physical machine(s) with
sbin/start-slave.sh  spark://192.168.0.38:7077.

*My question is under the circumstances do I need to bother with mesos or
yarn ?*

Collins dictionary
The exclamation mark is used after exclamations and emphatic expressions.

   - I can’t believe it!
   - Oh, no! Look at this mess!

The exclamation mark loses its effect if it is overused. It is better to
use a full stop after a sentence expressing mild excitement or humour.

   It was such a beautiful day.
   I felt like a perfect banana.


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(1)

  } // main

} // object



Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Could you play with Encoders.bean()? You can Encoders.bean() with your
class, and call .schema() with the return value to see how it transforms to
the schema in Spark SQL. The schema must be consistent across multiple JVM
runs to make it work properly, but I suspect it doesn't retain the order.

On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:

> I am listening to Kafka topic with a structured streaming application with
> Java,  testing it on my local Mac.
> When I retrieve back GroupState object with
> state.get(), it is giving some random values for the fields in the object,
> some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345', productId='222112345',
> priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming here'
> productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements MapGroupsWithStateFunction Event, ProductStateInformation, ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator 
> eventsIterator, GroupState state) throws Exception {
> {
>
>
>
>   if (state.hasTimedOut()) {
>
> //
>
> }else{
>
> if (state.exists()) {
> ProductStateInformation oldSession = state.get();
> System.out.println("State for productId:"+productId + " with old values 
> "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset dataSet = sparkSession
> .readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost")
> .option("startingOffsets","latest")
> .option("failOnDataLoss", "false")
> .option("subscribe", "topic1,topic2")
> .option("includeTimestamp", true)
>
> .load();
>
>  eventsDS.groupByKey(
> new MapFunction() {
> @Override public String call(Event event) {
> return event.getProductId();
> }
> }, Encoders.STRING())
> .mapGroupsWithState(
> new StateUpdateTask(3),
> Encoders.bean(ProductSessionInformation.class),
> Encoders.bean(ProductSessionUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
> .writeStream()
> .foreach(new ForeachWriter() {
> @Override
> public boolean open(long l, long l1) {return true;}
>
> @Override
> public void process(ProductSessionUpdate productSessionUpdate) {
> logger.info("-> query process: "+ productSessionUpdate);
> }
>
> @Override
> public void close(Throwable throwable) {}
> })
> .outputMode("update")
> .option("checkpointLocation", checkpointDir)
> .start();
>
> query.awaitTermination();
>
> }
>
> public class ProductStateInformation implements Serializable {
>
> protected String requestId;
> protected String productId;
> protected String priority;
> protected long firstEventTimeMillis;
> protected long lastEventTimeMillis;
> protected long firstReceivedTimeMillis;
> protected int numberOfEvents;
>
> ...//getter setters
>
> }
>
> These are are the versions I am using:
>
> 2.3.1
> 2.4.3
>
> 2.6.60.10.2.0
>
> 3.0.3
>
>


spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Srinivas V
I am listening to Kafka topic with a structured streaming application with
Java,  testing it on my local Mac.
When I retrieve back GroupState object with
state.get(), it is giving some random values for the fields in the object,
some are interchanging some are default and some are junk values.

See this example below:
While setting I am setting:
ProductSessionInformation{requestId='222112345', productId='222112345',
priority='0', firstEventTimeMillis=1585312384,
lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
numberOfEvents=1}

When I retrieve it back, it comes like this:
ProductSessionInformation{requestId='some junk characters are coming here'
productId='222112345', priority='222112345',
firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
firstReceivedTimeMillis=1585312384, numberOfEvents=1}

Any clue why it might be happening? I am stuck with this for couple of
days. Immediate help is appreciated.

code snippet:


public class StateUpdateTask implements
MapGroupsWithStateFunction {

 @Override
public ProductSessionUpdate call(String productId, Iterator
eventsIterator, GroupState state) throws
Exception {
{



  if (state.hasTimedOut()) {

//

}else{

if (state.exists()) {
ProductStateInformation oldSession = state.get();
System.out.println("State for productId:"+productId + " with old
values "+oldSession);

}


public class EventsApp implements Serializable{

public void run(String[] args) throws Exception {

...


Dataset dataSet = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost")
.option("startingOffsets","latest")
.option("failOnDataLoss", "false")
.option("subscribe", "topic1,topic2")
.option("includeTimestamp", true)

.load();

 eventsDS.groupByKey(
new MapFunction() {
@Override public String call(Event event) {
return event.getProductId();
}
}, Encoders.STRING())
.mapGroupsWithState(
new StateUpdateTask(3),
Encoders.bean(ProductSessionInformation.class),
Encoders.bean(ProductSessionUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());

...


StreamingQuery query = productUpdates
.writeStream()
.foreach(new ForeachWriter() {
@Override
public boolean open(long l, long l1) {return true;}

@Override
public void process(ProductSessionUpdate productSessionUpdate) {
logger.info("-> query process: "+ productSessionUpdate);
}

@Override
public void close(Throwable throwable) {}
})
.outputMode("update")
.option("checkpointLocation", checkpointDir)
.start();

query.awaitTermination();

}

public class ProductStateInformation implements Serializable {

protected String requestId;
protected String productId;
protected String priority;
protected long firstEventTimeMillis;
protected long lastEventTimeMillis;
protected long firstReceivedTimeMillis;
protected int numberOfEvents;

...//getter setters

}

These are are the versions I am using:

2.3.1
2.4.3

2.6.60.10.2.0

3.0.3


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
 Which Spark/Scala version do you use?

 On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
 wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark 
> Session take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
> instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert
> an extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing 
> the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter 
> and map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
> != "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>



Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>

 with the following sparksession configuration

 val spark = SparkSession.builder().master("local[*]").appName("Spark 
 Session take").getOrCreate();

 this line works

 flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)


 however if change the master url like so, with the ip address then the
 following error is produced by the position of .take(5)

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();


 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
 instance of java.lang.invoke.SerializedLambda to field
 org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
 of org.apache.spark.rdd.MapPartitionsRDD

 BUT if I  remove take(5) or change the position of take(5) or insert an
 extra take(5) as illustrated in code then it works. I don't see why the
 position of take(5) should cause such an error or be caused by changing the
 master url

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
 + 5))
flights.show(5)


 complete code if you wish to replicate it.

 import org.apache.spark.sql.SparkSession

 object sessiontest {

   // define specific  data type class then manipulate it using the filter 
 and map functions
   // this is also known as an Encoder
   case class flight (DEST_COUNTRY_NAME: String,
  ORIGIN_COUNTRY_NAME:String,
  count: BigInt)


   def main(args:Array[String]): Unit ={

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();

 import spark.implicits._
 val flightDf = 
 spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
 val flights = flightDf.as[flight]

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
 fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
flights.show(5)

   } // main
 }





 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 

>>>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>