[ 
https://issues.apache.org/jira/browse/SPARK-43503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Varun Arora updated SPARK-43503:
--------------------------------
    Priority: Critical  (was: Major)

> Deserialisation Failure on State Store Schema Evolution (Spark Structured 
> Streaming)
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-43503
>                 URL: https://issues.apache.org/jira/browse/SPARK-43503
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.1
>            Reporter: Varun Arora
>            Priority: Critical
>
> In streaming query, state is persisted in RocksDB using mapGroupsWithState 
> function. We use Encoders.bean to serialise state and store it in State 
> Store. Code snippet :-
>  
> {code:java}
> df
>         .groupByKey((MapFunction<Row, String>) event -> 
> event.getAs("stateGroupingId"), Encoders.STRING())
>         .mapGroupsWithState(mapGroupsWithStateFunction, 
> Encoders.bean(StateInfo.class), Encoders.bean(StateOutput.class), 
> GroupStateTimeout.ProcessingTimeTimeout()); {code}
> As per the above example, StateInfo bean contains state information which is 
> stored in State store. However, on adding/removing field from StateInfo bean 
> and on re-running query we get deserialisation exception. Is there a way to 
> handle this scenario or to provide custom deserialisation to handle schema 
> evolution.
> Exception :-
> {code:java}
> Stack: [0x000070000cd0a000,0x000070000ce0a000],  sp=0x000070000ce08400,  free 
> space=1017k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> V  [libjvm.dylib+0x57c2b5]  Unsafe_GetLong+0x55
> J 8700  sun.misc.Unsafe.getLong(Ljava/lang/Object;J)J (0 bytes) @ 
> 0x000000010fe9e6be [0x000000010fe9e600+0xbe]
> j  org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J+5
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.pointTo(Ljava/lang/Object;JI)V+2
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeMapData.pointTo(Ljava/lang/Object;JI)V+187
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getMap(I)Lorg/apache/spark/sql/catalyst/expressions/UnsafeMapData;+52
> j  
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getMap(I)Lorg/apache/spark/sql/catalyst/util/MapData;+2
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.MapObjects_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/util/ArrayData;+53
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.StaticInvoke_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/util/Map;+14
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.initializeJavaBean_0_1$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/example/streaming/StateInfo;)V+2
> j  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Ljava/lang/Object;)Ljava/lang/Object;+74
> j  
> org.apache.spark.sql.execution.ObjectOperator$.$anonfun$deserializeRowToObject$1(Lorg/apache/spark/sql/catalyst/expressions/package$Projection;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;+2
> j  
> org.apache.spark.sql.execution.ObjectOperator$$$Lambda$2600.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
> j  
> org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getStateObject(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)Ljava/lang/Object;+9
> j  
> org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getState(Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)Lorg/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper$StateData;+16
> j  
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.$anonfun$processNewData$1(Lorg/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec$InputProcessor;Lscala/Tuple2;)Lscala/collection/GenTraversableOnce;+45
> j  
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor$$Lambda$3237.apply(Ljava/lang/Object;)Ljava/lang/Object;+8
> J 5928 C2 scala.collection.Iterator$$anon$11.hasNext()Z (35 bytes) @ 
> 0x000000010e9a6a58 [0x000000010e9a6620+0x438]
> j  org.apache.spark.util.CompletionIterator.hasNext()Z+4
> j  scala.collection.Iterator$ConcatIterator.hasNext()Z+22
> j  org.apache.spark.util.CompletionIterator.hasNext()Z+4
> J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 
> 0x000000010f1264cc [0x000000010f1263c0+0x10c]
> J 5819 C1 scala.collection.Iterator$$anon$12.hasNext()Z (60 bytes) @ 
> 0x000000010eb53e44 [0x000000010eb53ce0+0x164]
> J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 
> 0x000000010f1264cc [0x000000010f1263c0+0x10c]
> J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 
> 0x000000010f1264cc [0x000000010f1263c0+0x10c]
> J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 
> 0x000000010f1264cc [0x000000010f1263c0+0x10c]
> J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 
> 0x000000010f1264cc [0x000000010f1263c0+0x10c]
> j  
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(ZILscala/collection/Iterator;)Lscala/collection/Iterator;+199
> j  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3096.apply(Ljava/lang/Object;)Ljava/lang/Object;+12
> j  
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(Lscala/Function1;Lorg/apache/spark/TaskContext;ILscala/collection/Iterator;)Lscala/collection/Iterator;+2
> j  
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;+7
> j  
> org.apache.spark.rdd.RDD$$Lambda$2500.apply(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;+13
> j  
> org.apache.spark.rdd.MapPartitionsRDD.compute(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+27
> j  
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+26
> j  
> org.apache.spark.rdd.RDD.iterator(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+42
> j  
> org.apache.spark.scheduler.ResultTask.runTask(Lorg/apache/spark/TaskContext;)Ljava/lang/Object;+203
> j  
> org.apache.spark.scheduler.Task.run(JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Option;)Ljava/lang/Object;+226
> j  
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Lorg/apache/spark/executor/Executor$TaskRunner;Lscala/runtime/BooleanRef;)Ljava/lang/Object;+36
> j  
> org.apache.spark.executor.Executor$TaskRunner$$Lambda$2458.apply()Ljava/lang/Object;+8
> j  
> org.apache.spark.util.Utils$.tryWithSafeFinally(Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
> j  org.apache.spark.executor.Executor$TaskRunner.run()V+443
> j  
> java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V+95
> j  java.util.concurrent.ThreadPoolExecutor$Worker.run()V+5
> j  java.lang.Thread.run()V+11
> v  ~StubRoutines::call_stub
> V  [libjvm.dylib+0x2d4795]  JavaCalls::call_helper(JavaValue*, methodHandle*, 
> JavaCallArguments*, Thread*)+0x771
> V  [libjvm.dylib+0x2d3460]  JavaCalls::call_virtual(JavaValue*, KlassHandle, 
> Symbol*, Symbol*, JavaCallArguments*, Thread*)+0x14a
> V  [libjvm.dylib+0x2d367b]  JavaCalls::call_virtual(JavaValue*, Handle, 
> KlassHandle, Symbol*, Symbol*, Thread*)+0x57
> V  [libjvm.dylib+0x341cf5]  thread_entry(JavaThread*, Thread*)+0x78
> V  [libjvm.dylib+0x564226]  JavaThread::thread_main_inner()+0x82
> V  [libjvm.dylib+0x5640eb]  JavaThread::run()+0x19d
> V  [libjvm.dylib+0x48f359]  java_start(Thread*)+0xfa
> C  [libsystem_pthread.dylib+0x61d3]  _pthread_start+0x7d
> C  [libsystem_pthread.dylib+0x1bd3]  thread_start+0xf
> C  0x0000000000000000{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to