Re: Spark Streaming Code

2020-03-28 Thread Jungtaek Lim
To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.

On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj  wrote:

> Hi Team,
>
> Need help on windowing & watermark concept.  This code is not working as
> expected.
>
> package com.jiomoney.streaming
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.ProcessingTime
>
> object SlingStreaming {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Coupons_ViewingNow")
>   .getOrCreate()
>
> import spark.implicits._
>
> val checkpoint_path = "/opt/checkpoints/"
>
> val ks = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "16777216")
>   .load()
>
> val dfDeviceid = ks
>   .withColumn("val", ($"value").cast("string"))
>   .withColumn("count1", get_json_object(($"val"), "$.a"))
>   .withColumn("deviceId", get_json_object(($"val"), "$.b"))
>   .withColumn("timestamp", current_timestamp())
>
>
> val final_ids = dfDeviceid
>   .withColumn("processing_time", current_timestamp())
>   .withWatermark("processing_time","1 minutes")
>   .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
>   .agg(sum($"count1") as "total")
>
> val t = final_ids
>   .select(to_json(struct($"*")) as "value")
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sub_topic")
>   .option("checkpointLocation", checkpoint_path)
>   .outputMode("append")
>   .trigger(ProcessingTime("1 seconds"))
>   .start()
>
> t.awaitTermination()
>
>   }
>
> }
>
>
> Thanks
>
>


Re: Spark structural streaming sinks output late

2020-03-28 Thread Siva Samraj
Yes, I am also facing the same issue. Did you figured out?

On Tue, 9 Jul 2019, 7:25 pm Kamalanathan Venkatesan, <
kamalanatha...@in.ey.com> wrote:

> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening every ~2 mins and above.
>
> What could be the issue
>
>
>
> *def* streaming(): Unit = {
>
> System.setProperty("hadoop.home.dir", "/Documents/ ")
>
> *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
> conf.set("spark.eventLog.enabled", "false");
>
> *val* sc: SparkContext = *new* SparkContext(conf)
>
> *val* sqlcontext = *new* SQLContext(sc)
>
> *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
> *import* sqlcontext.implicits._
>
> *import* org.apache.spark.sql.functions.window
>
>
>
> *val* inputDf = spark.readStream.format("kafka")
>
>   .option("kafka.bootstrap.servers", "localhost:9092")
>
>   .option("subscribe", "wonderful")
>
>   .option("startingOffsets", "latest")
>
>   .load()
>
> *import* scala.concurrent.duration._
>
>
>
> *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>   .withWatermark("timestamp", "500 milliseconds")
>
>   .groupBy(
>
> window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
> *val* consoleOutput = personJsonDf.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .option("truncate", "false")
>
>   .outputMode(OutputMode.Update())
>
>   .start()
>
> consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
> println("test")
>
> spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>


Spark Streaming Code

2020-03-28 Thread Siva Samraj
Hi Team,

Need help on windowing & watermark concept.  This code is not working as
expected.

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Coupons_ViewingNow")
  .getOrCreate()

import spark.implicits._

val checkpoint_path = "/opt/checkpoints/"

val ks = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "16777216")
  .load()

val dfDeviceid = ks
  .withColumn("val", ($"value").cast("string"))
  .withColumn("count1", get_json_object(($"val"), "$.a"))
  .withColumn("deviceId", get_json_object(($"val"), "$.b"))
  .withColumn("timestamp", current_timestamp())


val final_ids = dfDeviceid
  .withColumn("processing_time", current_timestamp())
  .withWatermark("processing_time","1 minutes")
  .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
  .agg(sum($"count1") as "total")

val t = final_ids
  .select(to_json(struct($"*")) as "value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sub_topic")
  .option("checkpointLocation", checkpoint_path)
  .outputMode("append")
  .trigger(ProcessingTime("1 seconds"))
  .start()

t.awaitTermination()

  }

}


Thanks


Beautiful Spark Code

2020-03-28 Thread Zahid Rahman
You will be please to learn that Mr. Mathew Powers have seen to my needs
and answers all my questions.

He has seen to all my needs.

Mr Powers  has shut me up !!!

Mr Powers has made Google search stackoverflow and user@spark.apache.org
redundant.

That is all you guys and girl had to do , point me to his book.

https://leanpub.com/beautiful-spark

https://mungingdata.com/writing-beautiful-apache-spark2-code-with-scala/




On Sat, 28 Mar 2020, 16:49 Zahid Rahman,  wrote:

> Thanks for the tip!
>
> But if the first thing you come across
> Is somebody  using the trim function to strip away spaces in
> /etc/hostnames like so from :
>
> 127.0.0.1 hostname local
>
> To
>
> 127.0.0.1hostnamelocal
>
> Then there is a log error message showing the outcome of unnecessarily
> using the trim function.
>
> Especially when one of the spark core functionality is to read lines from
> files separated by a space, comma.
>
> Also have you seen the log4j.properties
> Setting to ERROR and in one case FATAL
> for suppressing discrepancies.
>
> Please May I draw your attention and attention of all in the community to
> this page Which shows turning on compiler WARNINGS  before releasing
> software and other software best practices.
>
> “The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
> https://link.medium.com/PUz88PIql3
>
> What impression  would you have  ?
>
>
>
> On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
> wrote:
>
>> Dude, you really need to chill. Have you ever worked with a large open
>> source project before? It seems not. Even so, insinuating there are tons of
>> bugs that were left uncovered until you came along (despite the fact that
>> the project is used by millions across many different organizations) is
>> ludicrous. Learn a little bit of humility
>>
>> If you're new to something, assume you have made a mistake rather than
>> that there is a bug. Lurk a bit more, or even do a simple Google search,
>> and you will realize Sean is a very senior committer (i.e. expert) in
>> Spark, and has been for many years. He, and everyone else participating in
>> these lists, is doing it voluntarily on their own time. They're not being
>> paid to handhold you and quickly answer to your every whim.
>>
>> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>>
>>> So the schema is limited to holding only the DEFINITION of schema. For
>>> example as you say  the columns, I.e. first column User:Int 2nd column
>>> String:password.
>>>
>>> Not location of source I.e. csv file with or without header.  SQL DB
>>> tables.
>>>
>>> I am pleased for once I am wrong about being another bug, and it was a
>>> design decision adding flexibility.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>>> wrote:
>>>
 This is probably more of a question for the user support list, but I
 believe I understand the issue.

 Schema inside of spark refers to the structure of the output rows, for
 example the schema for a particular dataframe could be
 (User: Int, Password: String) - Two Columns the first is User of type
 int and the second is Password of Type String.

 When you pass the schema from one reader to another, you are only
 copyting this structure, not all of the other options associated with the
 dataframe.
 This is usually useful when you are reading from sources with different
 options but data that needs to be read into the same structure.

 The other properties such as "format" and "options" exist independently
 of Schema. This is helpful if I was reading from both MySQL and
 a comma separated file for example. While the Schema is the same, the
 options like ("inferSchema") do not apply to both MySql and CSV and
 format actually picks whether to us "JDBC" or "CSV" so copying that
 wouldn't be helpful either.

 I hope this clears things up,
 Russ

 On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
 wrote:

> Hi,
> version: spark-3.0.0-preview2-bin-hadoop2.7
>
> As you can see from the code :
>
> STEP 1:  I  create a object of type static frame which holds all the
> information to the datasource (csv files).
>
> STEP 2: Then I create a variable  called staticSchema  assigning the
> information of the schema from the original static data frame.
>
> STEP 3: then I create another variable called val streamingDataFrame
> of type spark.readStream.
> and Into the .schema function parameters I pass the object
> staticSchema which is meant to hold the information to the  csv files
> including the .load(path) function etc.
>
> So then when I am creating val StreamingDataFrame and passing it
> .schema(staticSchema)
> the variable StreamingDataFrame  should have all the information.
> I should only have to call .option("maxFilePerTrigger",1) and not
> .format ("csv")
> .option("header","true").load("/data/retail-dat

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Thanks for the tip!

But if the first thing you come across
Is somebody  using the trim function to strip away spaces in /etc/hostnames
like so from :

127.0.0.1 hostname local

To

127.0.0.1hostnamelocal

Then there is a log error message showing the outcome of unnecessarily
using the trim function.

Especially when one of the spark core functionality is to read lines from
files separated by a space, comma.

Also have you seen the log4j.properties
Setting to ERROR and in one case FATAL
for suppressing discrepancies.

Please May I draw your attention and attention of all in the community to
this page Which shows turning on compiler WARNINGS  before releasing
software and other software best practices.

“The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
https://link.medium.com/PUz88PIql3

What impression  would you have  ?



On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
wrote:

> Dude, you really need to chill. Have you ever worked with a large open
> source project before? It seems not. Even so, insinuating there are tons of
> bugs that were left uncovered until you came along (despite the fact that
> the project is used by millions across many different organizations) is
> ludicrous. Learn a little bit of humility
>
> If you're new to something, assume you have made a mistake rather than
> that there is a bug. Lurk a bit more, or even do a simple Google search,
> and you will realize Sean is a very senior committer (i.e. expert) in
> Spark, and has been for many years. He, and everyone else participating in
> these lists, is doing it voluntarily on their own time. They're not being
> paid to handhold you and quickly answer to your every whim.
>
> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>
>> So the schema is limited to holding only the DEFINITION of schema. For
>> example as you say  the columns, I.e. first column User:Int 2nd column
>> String:password.
>>
>> Not location of source I.e. csv file with or without header.  SQL DB
>> tables.
>>
>> I am pleased for once I am wrong about being another bug, and it was a
>> design decision adding flexibility.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>> wrote:
>>
>>> This is probably more of a question for the user support list, but I
>>> believe I understand the issue.
>>>
>>> Schema inside of spark refers to the structure of the output rows, for
>>> example the schema for a particular dataframe could be
>>> (User: Int, Password: String) - Two Columns the first is User of type
>>> int and the second is Password of Type String.
>>>
>>> When you pass the schema from one reader to another, you are only
>>> copyting this structure, not all of the other options associated with the
>>> dataframe.
>>> This is usually useful when you are reading from sources with different
>>> options but data that needs to be read into the same structure.
>>>
>>> The other properties such as "format" and "options" exist independently
>>> of Schema. This is helpful if I was reading from both MySQL and
>>> a comma separated file for example. While the Schema is the same, the
>>> options like ("inferSchema") do not apply to both MySql and CSV and
>>> format actually picks whether to us "JDBC" or "CSV" so copying that
>>> wouldn't be helpful either.
>>>
>>> I hope this clears things up,
>>> Russ
>>>
>>> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
>>> wrote:
>>>
 Hi,
 version: spark-3.0.0-preview2-bin-hadoop2.7

 As you can see from the code :

 STEP 1:  I  create a object of type static frame which holds all the
 information to the datasource (csv files).

 STEP 2: Then I create a variable  called staticSchema  assigning the
 information of the schema from the original static data frame.

 STEP 3: then I create another variable called val streamingDataFrame of
 type spark.readStream.
 and Into the .schema function parameters I pass the object staticSchema
 which is meant to hold the information to the  csv files including the
 .load(path) function etc.

 So then when I am creating val StreamingDataFrame and passing it
 .schema(staticSchema)
 the variable StreamingDataFrame  should have all the information.
 I should only have to call .option("maxFilePerTrigger",1) and not
 .format ("csv")
 .option("header","true").load("/data/retail-data/by-day/*.csv")
 Otherwise what is the point of passing .schema(staticSchema) to
 StreamingDataFrame.

 You can replicate it using the complete code below.

 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.{window,column,desc,col}

 object RetailData {

   def main(args: Array[String]): Unit = {

 // create spark session
 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
 Data").getOrCreate();
 // set spark runtime  configuration
 spark.conf.set("spark.sql.s

Re: Best Practice: Evaluate Expression from Spark DataFrame Column

2020-03-28 Thread Chetan Khatri
Is there a way to pass column as a String to expr function in spark?

val sampleDF = Seq(
  (8, 1, "bat", "NUM IS NOT NULL AND FLAG!=0"),
  (64, 0, "mouse", "NUM IS NOT NULL AND FLAG!=0"),
  (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG!=0"),
  (1, 0, "miki", "NUM IS NOT NULL AND FLAG!=0 AND WORD == 'MIKI'")
).toDF("num", "flag", "word", "expression")

val derivedDF = sampleDF.withColumn("status",
expr(sampleDF.col("expression").as[String].toString()))


On Fri, Mar 27, 2020 at 10:35 PM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I want to evaluate expression from dataframe column values on other
> columns in the same dataframe for each row. Please suggest best approach to
> deal with this given that not impacting the performance of the job.
>
> Thanks
>
> Sample code:
>
> val sampleDF = Seq(
>   (8, 1, "bat", "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (64, 0, "mouse", "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (null, 0, "miki", "NUM IS NOT NULL AND FLAG IS NOT 1 AND WORD IS 'MIKI'")
> ).toDF("num", "flag", "word", "expression")
>
> val derivedDF = sampleDF.withColumn("status", sampleDF.col("expression"))
>
>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Ok, I will try to create some simple code to reproduce, if I can. Problem
is that I am adding this code in an existing big project with several
dependencies with spark streaming older version(2.2) on root level etc.

Also, I observed that there is @Experimental on GroupState class. What
state is it in now? Several people using this feature in prod?


On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim 
wrote:

> I have't heard known issue for this - that said, this may require new
> investigation which is not possible or require huge effort without simple
> reproducer.
>
> Contributors (who are basically volunteers) may not want to struggle to
> reproduce from your partial information - I'd recommend you to spend your
> time to help volunteers starting from simple reproducer, if you are stuck
> at it and have to resolve it.
>
> Could you please get rid of the business logic which you may want to
> redact, and provide full of source code which reproduces the bug?
>
> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>
>> Sorry for typos , correcting them below
>>
>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>
>>> Sorry I was just changing some names not to send exact names. Please
>>> ignore that. I am really struggling with this since couple of days. Can
>>> this happen due to
>>> 1. some of the values being null or
>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>> 3. Not enough memory ?
>>> BTW, I am using same names in my code.
>>>
>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Well, the code itself doesn't seem to be OK - you're using
 ProductStateInformation as the class of State whereas you provide
 ProductSessionInformation to Encoder for State.

 On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
 kabhwan.opensou...@gmail.com> wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms 
> to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
> wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState object
>> with state.get(), it is giving some random values for the fields in the
>> object, some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345',
>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple
>> of days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements 
>> MapGroupsWithStateFunction> ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator 
>> eventsIterator, GroupState state) throws 
>> Exception {
>> {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //
>>
>> }else{
>>
>> if (state.exists()) {
>> ProductStateInformation oldSession = state.get();
>> System.out.println("State for productId:"+productId + " with old 
>> values "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset dataSet = sparkSession
>> .readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "localhost")
>> .option("startingOffsets","latest")
>> .option("failOnDataLoss", "false")
>> .option("subscribe", "topic1,topic2")
>> .option("includeTimestamp", true)
>>
>> .load();
>>
>>  eventsDS.groupByKey(
>> new MapFunction() {
>> @Override public String call(Event event) {
>> return event.getProductId();
>> }
>> }, Encoders.STRING())
>> .mapGroupsWithState(
>> new StateUpdateTask(3),
>> Encoders.bean(ProductSessionInformati

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Jungtaek Lim
I have't heard known issue for this - that said, this may require new
investigation which is not possible or require huge effort without simple
reproducer.

Contributors (who are basically volunteers) may not want to struggle to
reproduce from your partial information - I'd recommend you to spend your
time to help volunteers starting from simple reproducer, if you are stuck
at it and have to resolve it.

Could you please get rid of the business logic which you may want to
redact, and provide full of source code which reproduces the bug?

On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:

> Sorry for typos , correcting them below
>
> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>
>> Sorry I was just changing some names not to send exact names. Please
>> ignore that. I am really struggling with this since couple of days. Can
>> this happen due to
>> 1. some of the values being null or
>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Well, the code itself doesn't seem to be OK - you're using
>>> ProductStateInformation as the class of State whereas you provide
>>> ProductSessionInformation to Encoder for State.
>>>
>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Could you play with Encoders.bean()? You can Encoders.bean() with your
 class, and call .schema() with the return value to see how it transforms to
 the schema in Spark SQL. The schema must be consistent across multiple JVM
 runs to make it work properly, but I suspect it doesn't retain the order.

 On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
 wrote:

> I am listening to Kafka topic with a structured streaming application
> with Java,  testing it on my local Mac.
> When I retrieve back GroupState object
> with state.get(), it is giving some random values for the fields in the
> object, some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345',
> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming
> here' productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements 
> MapGroupsWithStateFunction ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator 
> eventsIterator, GroupState state) throws 
> Exception {
> {
>
>
>
>   if (state.hasTimedOut()) {
>
> //
>
> }else{
>
> if (state.exists()) {
> ProductStateInformation oldSession = state.get();
> System.out.println("State for productId:"+productId + " with old 
> values "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset dataSet = sparkSession
> .readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost")
> .option("startingOffsets","latest")
> .option("failOnDataLoss", "false")
> .option("subscribe", "topic1,topic2")
> .option("includeTimestamp", true)
>
> .load();
>
>  eventsDS.groupByKey(
> new MapFunction() {
> @Override public String call(Event event) {
> return event.getProductId();
> }
> }, Encoders.STRING())
> .mapGroupsWithState(
> new StateUpdateTask(3),
> Encoders.bean(ProductSessionInformation.class),
> Encoders.bean(ProductSessionUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
> .writeStream()
> .foreach(new ForeachWriter() {
> @Override
> public boolean open(long l, long l1) {return true;}
>
> @Override
> public void process(ProductSessionUpdate 
> productSessionUpdate) {
> logger.info("-

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Sorry for typos , correcting them below

On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:

> Sorry I was just changing some names not to send exact names. Please
> ignore that. I am really struggling with this since couple of days. Can
> this happen due to
> 1. some of the values being null or
> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
> 3. Not enough memory ?
> BTW, I am using same names in my code.
>
> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Well, the code itself doesn't seem to be OK - you're using
>> ProductStateInformation as the class of State whereas you provide
>> ProductSessionInformation to Encoder for State.
>>
>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>> class, and call .schema() with the return value to see how it transforms to
>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>
>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>>>
 I am listening to Kafka topic with a structured streaming application
 with Java,  testing it on my local Mac.
 When I retrieve back GroupState object with
 state.get(), it is giving some random values for the fields in the object,
 some are interchanging some are default and some are junk values.

 See this example below:
 While setting I am setting:
 ProductSessionInformation{requestId='222112345', productId='222112345',
 priority='0', firstEventTimeMillis=1585312384,
 lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
 numberOfEvents=1}

 When I retrieve it back, it comes like this:
 ProductSessionInformation{requestId='some junk characters are coming
 here' productId='222112345', priority='222112345',
 firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
 firstReceivedTimeMillis=1585312384, numberOfEvents=1}

 Any clue why it might be happening? I am stuck with this for couple of
 days. Immediate help is appreciated.

 code snippet:


 public class StateUpdateTask implements MapGroupsWithStateFunction>>> Event, ProductStateInformation, ProductSessionUpdate> {

  @Override
 public ProductSessionUpdate call(String productId, Iterator 
 eventsIterator, GroupState state) throws 
 Exception {
 {



   if (state.hasTimedOut()) {

 //

 }else{

 if (state.exists()) {
 ProductStateInformation oldSession = state.get();
 System.out.println("State for productId:"+productId + " with old 
 values "+oldSession);

 }


 public class EventsApp implements Serializable{

 public void run(String[] args) throws Exception {

 ...


 Dataset dataSet = sparkSession
 .readStream()
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost")
 .option("startingOffsets","latest")
 .option("failOnDataLoss", "false")
 .option("subscribe", "topic1,topic2")
 .option("includeTimestamp", true)

 .load();

  eventsDS.groupByKey(
 new MapFunction() {
 @Override public String call(Event event) {
 return event.getProductId();
 }
 }, Encoders.STRING())
 .mapGroupsWithState(
 new StateUpdateTask(3),
 Encoders.bean(ProductSessionInformation.class),
 Encoders.bean(ProductSessionUpdate.class),
 GroupStateTimeout.ProcessingTimeTimeout());

 ...


 StreamingQuery query = productUpdates
 .writeStream()
 .foreach(new ForeachWriter() {
 @Override
 public boolean open(long l, long l1) {return true;}

 @Override
 public void process(ProductSessionUpdate productSessionUpdate) 
 {
 logger.info("-> query process: "+ 
 productSessionUpdate);
 }

 @Override
 public void close(Throwable throwable) {}
 })
 .outputMode("update")
 .option("checkpointLocation", checkpointDir)
 .start();

 query.awaitTermination();

 }

 public class ProductStateInformation implements Serializable {

 protected String requestId;
 protected String productId;
 protected String priority;
 protected long firstEventTimeMillis;
 protected long lastEventTimeMillis;
 protected long firstReceivedTimeMillis;
 protected int number

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Sorry I was just changing some names not to send exact names. Please ignore
that. I am really struggling with this sine couple of days. Can this happen
due to
1. some of the values being null or
2.UTF8  issue ? Or some sterilization/ deserilization issue ?
3. Not enough memory ?
I am using same names in my code.

On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim 
wrote:

> Well, the code itself doesn't seem to be OK - you're using
> ProductStateInformation as the class of State whereas you provide
> ProductSessionInformation to Encoder for State.
>
> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>> class, and call .schema() with the return value to see how it transforms to
>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>> runs to make it work properly, but I suspect it doesn't retain the order.
>>
>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>>
>>> I am listening to Kafka topic with a structured streaming application
>>> with Java,  testing it on my local Mac.
>>> When I retrieve back GroupState object with
>>> state.get(), it is giving some random values for the fields in the object,
>>> some are interchanging some are default and some are junk values.
>>>
>>> See this example below:
>>> While setting I am setting:
>>> ProductSessionInformation{requestId='222112345', productId='222112345',
>>> priority='0', firstEventTimeMillis=1585312384,
>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>> numberOfEvents=1}
>>>
>>> When I retrieve it back, it comes like this:
>>> ProductSessionInformation{requestId='some junk characters are coming
>>> here' productId='222112345', priority='222112345',
>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>
>>> Any clue why it might be happening? I am stuck with this for couple of
>>> days. Immediate help is appreciated.
>>>
>>> code snippet:
>>>
>>>
>>> public class StateUpdateTask implements MapGroupsWithStateFunction>> Event, ProductStateInformation, ProductSessionUpdate> {
>>>
>>>  @Override
>>> public ProductSessionUpdate call(String productId, Iterator 
>>> eventsIterator, GroupState state) throws Exception 
>>> {
>>> {
>>>
>>>
>>>
>>>   if (state.hasTimedOut()) {
>>>
>>> //
>>>
>>> }else{
>>>
>>> if (state.exists()) {
>>> ProductStateInformation oldSession = state.get();
>>> System.out.println("State for productId:"+productId + " with old values 
>>> "+oldSession);
>>>
>>> }
>>>
>>>
>>> public class EventsApp implements Serializable{
>>>
>>> public void run(String[] args) throws Exception {
>>>
>>> ...
>>>
>>>
>>> Dataset dataSet = sparkSession
>>> .readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "localhost")
>>> .option("startingOffsets","latest")
>>> .option("failOnDataLoss", "false")
>>> .option("subscribe", "topic1,topic2")
>>> .option("includeTimestamp", true)
>>>
>>> .load();
>>>
>>>  eventsDS.groupByKey(
>>> new MapFunction() {
>>> @Override public String call(Event event) {
>>> return event.getProductId();
>>> }
>>> }, Encoders.STRING())
>>> .mapGroupsWithState(
>>> new StateUpdateTask(3),
>>> Encoders.bean(ProductSessionInformation.class),
>>> Encoders.bean(ProductSessionUpdate.class),
>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> ...
>>>
>>>
>>> StreamingQuery query = productUpdates
>>> .writeStream()
>>> .foreach(new ForeachWriter() {
>>> @Override
>>> public boolean open(long l, long l1) {return true;}
>>>
>>> @Override
>>> public void process(ProductSessionUpdate productSessionUpdate) {
>>> logger.info("-> query process: "+ productSessionUpdate);
>>> }
>>>
>>> @Override
>>> public void close(Throwable throwable) {}
>>> })
>>> .outputMode("update")
>>> .option("checkpointLocation", checkpointDir)
>>> .start();
>>>
>>> query.awaitTermination();
>>>
>>> }
>>>
>>> public class ProductStateInformation implements Serializable {
>>>
>>> protected String requestId;
>>> protected String productId;
>>> protected String priority;
>>> protected long firstEventTimeMillis;
>>> protected long lastEventTimeMillis;
>>> protected long firstReceivedTimeMillis;
>>> protected int numberOfEvents;
>>>
>>> ...//getter setters
>>>
>>> }
>>>
>>> These are are the versions I am using:
>>>
>>> 2.3.1
>>> 2.4.3
>>>
>>> 2.6.60.10.2.0
>>>
>>> 3.0.3
>>>
>>>


Re: OFF TOPIC LIST CRITERIA

2020-03-28 Thread Zahid Rahman
Would it be surprising to know that I know the difference between a user
issue and a bug and feature.

I have commercial experience and training in safety critical software
(aerospace).  Believe me that is an entirely different world.
With entirely different standards.

Obviously if I want an answer because I am impatient then that is what the
line is for.
Otherwise why have a mailing list , just google it or read a book.

Even though I am beginning to get to know apache spark.
I have actually six books on the subject so I prepared in advance not to
rely on anyone.

This guy wants people only to ask those question which are of interest to
him.
Sadly there are people in IT who if they know a subject think they have a
moral authority too.
The two do not go hand in hand.



Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sat, 28 Mar 2020 at 07:13, ayan guha  wrote:

> Hi
>
> THough Sean requested a non-reply, but this is sincerely crazy.
>
> @Zahid - what Sean said is actually mentioned in Spark's website
>
>
>- u...@spark.incubator.apache.org
> --
>for usage questions, help, and announcements. (subscribe)
>
> 
> (archives)
>
>- d...@spark.incubator.apache.org
> -- for
>people who want to contribute code to Spark. (subscribe)
>
> 
> (archives)
>
>
> Most users will probably want the User list, but individuals interested in
> contributing code to the project should also subscribe to the Dev list.
>
> Also, in ASF - https://www.apache.org/dev/contrib-email-tips
>
> Its nothing wrong to use appropriate mailing lists, and nothing wrong if
> someone points out to you.
>
>
> Given I can vouch for @Sean Owen 's contribution in the
> user group (and especially around CI/CD), your claims and tone in which
> those are made are both sadly hilarious. Let us refrain from hurting each
> other in this otherwise troubled times.
>
> Best
>
> Ayan
>
>
>
>
> On Sat, Mar 28, 2020 at 2:11 PM Zahid Rahman  wrote:
>
>>
>> OK *user support. user@ is DONE !!!*
>>
>> I reported a work around to an existing bug actually to the experienced
>> user.
>> and "the experienced user" was "not aware" of the
>> setting in the log4j.properties so he learned something new too.
>> Clearly neither were you.
>>
>> Also it may surprise some people but  there are people who have been
>> formally
>> trained in software development.
>> We can tell a self trained a mile away.
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:
>>
>>> BCC user, dev, and I encourage others to not reply.
>>>
>>> I said _dev@_ is not for user support. user@ is. You heard that
>>> yesterday, too, and not to cross-post.
>>> You actually got answers to several questions, despite their tone,
>>> from experienced developers of the project.
>>>
>>> Messages like yours are, I assure you, not useful to _anybody_. If we
>>> let people talk like this on big community lists, yes _that_ will put
>>> up barriers.
>>> So, the answer for you is: you are not using either of these lists
>>> appropriately right now. If you can keep it civil and on-topic, use
>>> user@.
>>> Otherwise we will block you from the lists.
>>>
>>>
>>> Sean
>>>
>>> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman 
>>> wrote:
>>> >
>>> >
>>> > Sean Owen says the criteria of these two emailing list is not help to
>>> support some body
>>> > who is new but for people who have been using the software for a long
>>> time.
>>> >
>>> > He is implying I think that I should only send email when I find bugs
>>> so that I can help him in his work.
>>> > A one way street.
>>> >
>>> > He is suggesting the more familiar you are with this software the more
>>> important you are.
>>> > Some kind of Alpha male type heirachy.
>>> >
>>> > He wants to put a barrier in place where Apache foundation wants no
>>> barriers to free learning and free software.
>>> >
>>> > He has not reported any bugs while I have reported so many in such a
>>> short space of time.
>>> > He has warned me as well
>>> >
>>> > So that Sean Owen does not put a barrier in place for me in my path to
>>> free learning and free  Apache software
>>> > I would like somebody to clarify the criteria for me.
>>> >
>>> >
>>> > Backbutton.co.uk
>>> > ¯\_(ツ)_/¯
>>> > ♡۶Java♡۶RMI ♡۶
>>> > Make Use Method {MUM}
>>> > makeuse.org
>>>
>>
>
> --
> Best Regards,
> Ayan Guha
>


Re: OFF TOPIC LIST CRITERIA

2020-03-28 Thread ayan guha
Hi

THough Sean requested a non-reply, but this is sincerely crazy.

@Zahid - what Sean said is actually mentioned in Spark's website


   - u...@spark.incubator.apache.org
    -- for
   usage questions, help, and announcements. (subscribe)
   

(archives)
   
   - d...@spark.incubator.apache.org
    -- for
   people who want to contribute code to Spark. (subscribe)
   

(archives)
   

Most users will probably want the User list, but individuals interested in
contributing code to the project should also subscribe to the Dev list.

Also, in ASF - https://www.apache.org/dev/contrib-email-tips

Its nothing wrong to use appropriate mailing lists, and nothing wrong if
someone points out to you.


Given I can vouch for @Sean Owen 's contribution in the
user group (and especially around CI/CD), your claims and tone in which
those are made are both sadly hilarious. Let us refrain from hurting each
other in this otherwise troubled times.

Best

Ayan




On Sat, Mar 28, 2020 at 2:11 PM Zahid Rahman  wrote:

>
> OK *user support. user@ is DONE !!!*
>
> I reported a work around to an existing bug actually to the experienced
> user.
> and "the experienced user" was "not aware" of the
> setting in the log4j.properties so he learned something new too.
> Clearly neither were you.
>
> Also it may surprise some people but  there are people who have been
> formally
> trained in software development.
> We can tell a self trained a mile away.
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:
>
>> BCC user, dev, and I encourage others to not reply.
>>
>> I said _dev@_ is not for user support. user@ is. You heard that
>> yesterday, too, and not to cross-post.
>> You actually got answers to several questions, despite their tone,
>> from experienced developers of the project.
>>
>> Messages like yours are, I assure you, not useful to _anybody_. If we
>> let people talk like this on big community lists, yes _that_ will put
>> up barriers.
>> So, the answer for you is: you are not using either of these lists
>> appropriately right now. If you can keep it civil and on-topic, use
>> user@.
>> Otherwise we will block you from the lists.
>>
>>
>> Sean
>>
>> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman 
>> wrote:
>> >
>> >
>> > Sean Owen says the criteria of these two emailing list is not help to
>> support some body
>> > who is new but for people who have been using the software for a long
>> time.
>> >
>> > He is implying I think that I should only send email when I find bugs
>> so that I can help him in his work.
>> > A one way street.
>> >
>> > He is suggesting the more familiar you are with this software the more
>> important you are.
>> > Some kind of Alpha male type heirachy.
>> >
>> > He wants to put a barrier in place where Apache foundation wants no
>> barriers to free learning and free software.
>> >
>> > He has not reported any bugs while I have reported so many in such a
>> short space of time.
>> > He has warned me as well
>> >
>> > So that Sean Owen does not put a barrier in place for me in my path to
>> free learning and free  Apache software
>> > I would like somebody to clarify the criteria for me.
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>

-- 
Best Regards,
Ayan Guha