Could you play with Encoders.bean()? You can Encoders.bean() with your
class, and call .schema() with the return value to see how it transforms to
the schema in Spark SQL. The schema must be consistent across multiple JVM
runs to make it work properly, but I suspect it doesn't retain the order.

On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <srini....@gmail.com> wrote:

> I am listening to Kafka topic with a structured streaming application with
> Java,  testing it on my local Mac.
> When I retrieve back GroupState<ProductSessionInformation> object with
> state.get(), it is giving some random values for the fields in the object,
> some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345', productId='222112345',
> priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming here'
> productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<String, 
> Event, ProductStateInformation, ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator<Event> 
> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>     {
>
>
>
>   if (state.hasTimedOut()) {
>
> //....
>
> }else{
>
> if (state.exists()) {
>     ProductStateInformation oldSession = state.get();
>     System.out.println("State for productId:"+productId + " with old values 
> "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset<Row> dataSet = sparkSession
>         .readStream()
>         .format("kafka")
>         .option("kafka.bootstrap.servers", "localhost")
>         .option("startingOffsets","latest")
>         .option("failOnDataLoss", "false")
>         .option("subscribe", "topic1,topic2")
>         .option("includeTimestamp", true)
>
>         .load();
>
>  eventsDS.groupByKey(
>                 new MapFunction<Event, String>() {
>                     @Override public String call(Event event) {
>                         return event.getProductId();
>                     }
>                 }, Encoders.STRING())
>         .mapGroupsWithState(
>                 new StateUpdateTask(30000),
>                 Encoders.bean(ProductSessionInformation.class),
>                 Encoders.bean(ProductSessionUpdate.class),
>                 GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
>         .writeStream()
>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>             @Override
>             public boolean open(long l, long l1) {return true;}
>
>             @Override
>             public void process(ProductSessionUpdate productSessionUpdate) {
>                 logger.info("-----> query process: "+ productSessionUpdate);
>             }
>
>             @Override
>             public void close(Throwable throwable) {}
>         })
>         .outputMode("update")
>         .option("checkpointLocation", checkpointDir)
>         .start();
>
> query.awaitTermination();
>
> }
>
> public class ProductStateInformation implements Serializable {
>
>     protected String requestId;
>     protected String productId;
>     protected String priority;
>     protected long firstEventTimeMillis;
>     protected long lastEventTimeMillis;
>     protected long firstReceivedTimeMillis;
>     protected int numberOfEvents;
>
> ...//getter setters
>
> }
>
> These are are the versions I am using:
>
> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
> <spark.version>2.4.3</spark.version>
>
> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>
> <kryo.version>3.0.3</kryo.version>
>
>

Reply via email to