I have't heard known issue for this - that said, this may require new
investigation which is not possible or require huge effort without simple
reproducer.

Contributors (who are basically volunteers) may not want to struggle to
reproduce from your partial information - I'd recommend you to spend your
time to help volunteers starting from simple reproducer, if you are stuck
at it and have to resolve it.

Could you please get rid of the business logic which you may want to
redact, and provide full of source code which reproduces the bug?

On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <srini....@gmail.com> wrote:

> Sorry for typos , correcting them below
>
> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <srini....@gmail.com> wrote:
>
>> Sorry I was just changing some names not to send exact names. Please
>> ignore that. I am really struggling with this since couple of days. Can
>> this happen due to
>> 1. some of the values being null or
>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Well, the code itself doesn't seem to be OK - you're using
>>> ProductStateInformation as the class of State whereas you provide
>>> ProductSessionInformation to Encoder for State.
>>>
>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
>>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>>> class, and call .schema() with the return value to see how it transforms to
>>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>>
>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <srini....@gmail.com>
>>>> wrote:
>>>>
>>>>> I am listening to Kafka topic with a structured streaming application
>>>>> with Java,  testing it on my local Mac.
>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>> with state.get(), it is giving some random values for the fields in the
>>>>> object, some are interchanging some are default and some are junk values.
>>>>>
>>>>> See this example below:
>>>>> While setting I am setting:
>>>>> ProductSessionInformation{requestId='222112345',
>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>> numberOfEvents=1}
>>>>>
>>>>> When I retrieve it back, it comes like this:
>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>> here' productId='222112345', priority='222112345',
>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>
>>>>> Any clue why it might be happening? I am stuck with this for couple of
>>>>> days. Immediate help is appreciated.
>>>>>
>>>>> code snippet:
>>>>>
>>>>>
>>>>> public class StateUpdateTask implements 
>>>>> MapGroupsWithStateFunction<String, Event, ProductStateInformation, 
>>>>> ProductSessionUpdate> {
>>>>>
>>>>>  @Override
>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> 
>>>>> eventsIterator, GroupState<ProductStateInformation> state) throws 
>>>>> Exception {
>>>>>     {
>>>>>
>>>>>
>>>>>
>>>>>   if (state.hasTimedOut()) {
>>>>>
>>>>> //....
>>>>>
>>>>> }else{
>>>>>
>>>>> if (state.exists()) {
>>>>>     ProductStateInformation oldSession = state.get();
>>>>>     System.out.println("State for productId:"+productId + " with old 
>>>>> values "+oldSession);
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> public class EventsApp implements Serializable{
>>>>>
>>>>> public void run(String[] args) throws Exception {
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>> Dataset<Row> dataSet = sparkSession
>>>>>         .readStream()
>>>>>         .format("kafka")
>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>         .option("startingOffsets","latest")
>>>>>         .option("failOnDataLoss", "false")
>>>>>         .option("subscribe", "topic1,topic2")
>>>>>         .option("includeTimestamp", true)
>>>>>
>>>>>         .load();
>>>>>
>>>>>  eventsDS.groupByKey(
>>>>>                 new MapFunction<Event, String>() {
>>>>>                     @Override public String call(Event event) {
>>>>>                         return event.getProductId();
>>>>>                     }
>>>>>                 }, Encoders.STRING())
>>>>>         .mapGroupsWithState(
>>>>>                 new StateUpdateTask(30000),
>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>> StreamingQuery query = productUpdates
>>>>>         .writeStream()
>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>             @Override
>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>
>>>>>             @Override
>>>>>             public void process(ProductSessionUpdate 
>>>>> productSessionUpdate) {
>>>>>                 logger.info("-----> query process: "+ 
>>>>> productSessionUpdate);
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void close(Throwable throwable) {}
>>>>>         })
>>>>>         .outputMode("update")
>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>         .start();
>>>>>
>>>>> query.awaitTermination();
>>>>>
>>>>> }
>>>>>
>>>>> public class ProductStateInformation implements Serializable {
>>>>>
>>>>>     protected String requestId;
>>>>>     protected String productId;
>>>>>     protected String priority;
>>>>>     protected long firstEventTimeMillis;
>>>>>     protected long lastEventTimeMillis;
>>>>>     protected long firstReceivedTimeMillis;
>>>>>     protected int numberOfEvents;
>>>>>
>>>>> ...//getter setters
>>>>>
>>>>> }
>>>>>
>>>>> These are are the versions I am using:
>>>>>
>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>> <spark.version>2.4.3</spark.version>
>>>>>
>>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>>
>>>>> <kryo.version>3.0.3</kryo.version>
>>>>>
>>>>>

Reply via email to