Never mind. It got resolved after I removed extra two getter methods (to
calculate duration) I created in my State specific Java bean
(ProductSessionInformation). But I am surprised why it has created so much
problem. I guess when this bean is converted to Scala class it may not be
taking care of non getter methods of the fields defined? Still how is that
causing the state object get corrupt so much?

On Sat, Mar 28, 2020 at 7:46 PM Srinivas V <> wrote:

> Ok, I will try to create some simple code to reproduce, if I can. Problem
> is that I am adding this code in an existing big project with several
> dependencies with spark streaming older version(2.2) on root level etc.
> Also, I observed that there is @Experimental on GroupState class. What
> state is it in now? Several people using this feature in prod?
> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <>
> wrote:
>> I have't heard known issue for this - that said, this may require new
>> investigation which is not possible or require huge effort without simple
>> reproducer.
>> Contributors (who are basically volunteers) may not want to struggle to
>> reproduce from your partial information - I'd recommend you to spend your
>> time to help volunteers starting from simple reproducer, if you are stuck
>> at it and have to resolve it.
>> Could you please get rid of the business logic which you may want to
>> redact, and provide full of source code which reproduces the bug?
>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <> wrote:
>>> Sorry for typos , correcting them below
>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <> wrote:
>>>> Sorry I was just changing some names not to send exact names. Please
>>>> ignore that. I am really struggling with this since couple of days. Can
>>>> this happen due to
>>>> 1. some of the values being null or
>>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>>> 3. Not enough memory ?
>>>> BTW, I am using same names in my code.
>>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>>>> wrote:
>>>>> Well, the code itself doesn't seem to be OK - you're using
>>>>> ProductStateInformation as the class of State whereas you provide
>>>>> ProductSessionInformation to Encoder for State.
>>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>>>> wrote:
>>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>>>>> your class, and call .schema() with the return value to see how it
>>>>>> transforms to the schema in Spark SQL. The schema must be consistent 
>>>>>> across
>>>>>> multiple JVM runs to make it work properly, but I suspect it doesn't 
>>>>>> retain
>>>>>> the order.
>>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <>
>>>>>> wrote:
>>>>>>> I am listening to Kafka topic with a structured streaming
>>>>>>> application with Java,  testing it on my local Mac.
>>>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>>> object, some are interchanging some are default and some are junk 
>>>>>>> values.
>>>>>>> See this example below:
>>>>>>> While setting I am setting:
>>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>>> numberOfEvents=1}
>>>>>>> When I retrieve it back, it comes like this:
>>>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>>>> here' productId='222112345', priority='222112345',
>>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>>> of days. Immediate help is appreciated.
>>>>>>> code snippet:
>>>>>>> public class StateUpdateTask implements 
>>>>>>> MapGroupsWithStateFunction<String, Event, ProductStateInformation, 
>>>>>>> ProductSessionUpdate> {
>>>>>>>  @Override
>>>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> 
>>>>>>> eventsIterator, GroupState<ProductStateInformation> state) throws 
>>>>>>> Exception {
>>>>>>>     {
>>>>>>>   if (state.hasTimedOut()) {
>>>>>>> //....
>>>>>>> }else{
>>>>>>> if (state.exists()) {
>>>>>>>     ProductStateInformation oldSession = state.get();
>>>>>>>     System.out.println("State for productId:"+productId + " with old 
>>>>>>> values "+oldSession);
>>>>>>> }
>>>>>>> public class EventsApp implements Serializable{
>>>>>>> public void run(String[] args) throws Exception {
>>>>>>> ...
>>>>>>> Dataset<Row> dataSet = sparkSession
>>>>>>>         .readStream()
>>>>>>>         .format("kafka")
>>>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>>>         .option("startingOffsets","latest")
>>>>>>>         .option("failOnDataLoss", "false")
>>>>>>>         .option("subscribe", "topic1,topic2")
>>>>>>>         .option("includeTimestamp", true)
>>>>>>>         .load();
>>>>>>>  eventsDS.groupByKey(
>>>>>>>                 new MapFunction<Event, String>() {
>>>>>>>                     @Override public String call(Event event) {
>>>>>>>                         return event.getProductId();
>>>>>>>                     }
>>>>>>>                 }, Encoders.STRING())
>>>>>>>         .mapGroupsWithState(
>>>>>>>                 new StateUpdateTask(30000),
>>>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>> ...
>>>>>>> StreamingQuery query = productUpdates
>>>>>>>         .writeStream()
>>>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>>>             @Override
>>>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>>>             @Override
>>>>>>>             public void process(ProductSessionUpdate 
>>>>>>> productSessionUpdate) {
>>>>>>>       "-----> query process: "+ 
>>>>>>> productSessionUpdate);
>>>>>>>             }
>>>>>>>             @Override
>>>>>>>             public void close(Throwable throwable) {}
>>>>>>>         })
>>>>>>>         .outputMode("update")
>>>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>>>         .start();
>>>>>>> query.awaitTermination();
>>>>>>> }
>>>>>>> public class ProductStateInformation implements Serializable {
>>>>>>>     protected String requestId;
>>>>>>>     protected String productId;
>>>>>>>     protected String priority;
>>>>>>>     protected long firstEventTimeMillis;
>>>>>>>     protected long lastEventTimeMillis;
>>>>>>>     protected long firstReceivedTimeMillis;
>>>>>>>     protected int numberOfEvents;
>>>>>>> ...//getter setters
>>>>>>> }
>>>>>>> These are are the versions I am using:
>>>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>>>> <spark.version>2.4.3</spark.version>
>>>>>>> <jackson.version>2.6.6</jackson.version><kafka.version></kafka.version>
>>>>>>> <kryo.version>3.0.3</kryo.version>

Reply via email to