[ https://issues.apache.org/jira/browse/SPARK-43503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Varun Arora updated SPARK-43503: -------------------------------- Priority: Critical (was: Major) > Deserialisation Failure on State Store Schema Evolution (Spark Structured > Streaming) > ------------------------------------------------------------------------------------ > > Key: SPARK-43503 > URL: https://issues.apache.org/jira/browse/SPARK-43503 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.2.1 > Reporter: Varun Arora > Priority: Critical > > In streaming query, state is persisted in RocksDB using mapGroupsWithState > function. We use Encoders.bean to serialise state and store it in State > Store. Code snippet :- > > {code:java} > df > .groupByKey((MapFunction<Row, String>) event -> > event.getAs("stateGroupingId"), Encoders.STRING()) > .mapGroupsWithState(mapGroupsWithStateFunction, > Encoders.bean(StateInfo.class), Encoders.bean(StateOutput.class), > GroupStateTimeout.ProcessingTimeTimeout()); {code} > As per the above example, StateInfo bean contains state information which is > stored in State store. However, on adding/removing field from StateInfo bean > and on re-running query we get deserialisation exception. Is there a way to > handle this scenario or to provide custom deserialisation to handle schema > evolution. > Exception :- > {code:java} > Stack: [0x000070000cd0a000,0x000070000ce0a000], sp=0x000070000ce08400, free > space=1017k > Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native > code) > V [libjvm.dylib+0x57c2b5] Unsafe_GetLong+0x55 > J 8700 sun.misc.Unsafe.getLong(Ljava/lang/Object;J)J (0 bytes) @ > 0x000000010fe9e6be [0x000000010fe9e600+0xbe] > j org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J+5 > j > org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.pointTo(Ljava/lang/Object;JI)V+2 > j > org.apache.spark.sql.catalyst.expressions.UnsafeMapData.pointTo(Ljava/lang/Object;JI)V+187 > j > org.apache.spark.sql.catalyst.expressions.UnsafeRow.getMap(I)Lorg/apache/spark/sql/catalyst/expressions/UnsafeMapData;+52 > j > org.apache.spark.sql.catalyst.expressions.UnsafeRow.getMap(I)Lorg/apache/spark/sql/catalyst/util/MapData;+2 > j > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.MapObjects_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/util/ArrayData;+53 > j > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.StaticInvoke_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/util/Map;+14 > j > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.initializeJavaBean_0_1$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificSafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/example/streaming/StateInfo;)V+2 > j > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Ljava/lang/Object;)Ljava/lang/Object;+74 > j > org.apache.spark.sql.execution.ObjectOperator$.$anonfun$deserializeRowToObject$1(Lorg/apache/spark/sql/catalyst/expressions/package$Projection;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;+2 > j > org.apache.spark.sql.execution.ObjectOperator$$$Lambda$2600.apply(Ljava/lang/Object;)Ljava/lang/Object;+12 > j > org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getStateObject(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)Ljava/lang/Object;+9 > j > org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getState(Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)Lorg/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper$StateData;+16 > j > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.$anonfun$processNewData$1(Lorg/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec$InputProcessor;Lscala/Tuple2;)Lscala/collection/GenTraversableOnce;+45 > j > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor$$Lambda$3237.apply(Ljava/lang/Object;)Ljava/lang/Object;+8 > J 5928 C2 scala.collection.Iterator$$anon$11.hasNext()Z (35 bytes) @ > 0x000000010e9a6a58 [0x000000010e9a6620+0x438] > j org.apache.spark.util.CompletionIterator.hasNext()Z+4 > j scala.collection.Iterator$ConcatIterator.hasNext()Z+22 > j org.apache.spark.util.CompletionIterator.hasNext()Z+4 > J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ > 0x000000010f1264cc [0x000000010f1263c0+0x10c] > J 5819 C1 scala.collection.Iterator$$anon$12.hasNext()Z (60 bytes) @ > 0x000000010eb53e44 [0x000000010eb53ce0+0x164] > J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ > 0x000000010f1264cc [0x000000010f1263c0+0x10c] > J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ > 0x000000010f1264cc [0x000000010f1263c0+0x10c] > J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ > 0x000000010f1264cc [0x000000010f1263c0+0x10c] > J 2875 C1 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ > 0x000000010f1264cc [0x000000010f1263c0+0x10c] > j > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(ZILscala/collection/Iterator;)Lscala/collection/Iterator;+199 > j > org.apache.spark.sql.execution.SparkPlan$$Lambda$3096.apply(Ljava/lang/Object;)Ljava/lang/Object;+12 > j > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(Lscala/Function1;Lorg/apache/spark/TaskContext;ILscala/collection/Iterator;)Lscala/collection/Iterator;+2 > j > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;+7 > j > org.apache.spark.rdd.RDD$$Lambda$2500.apply(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;+13 > j > org.apache.spark.rdd.MapPartitionsRDD.compute(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+27 > j > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+26 > j > org.apache.spark.rdd.RDD.iterator(Lorg/apache/spark/Partition;Lorg/apache/spark/TaskContext;)Lscala/collection/Iterator;+42 > j > org.apache.spark.scheduler.ResultTask.runTask(Lorg/apache/spark/TaskContext;)Ljava/lang/Object;+203 > j > org.apache.spark.scheduler.Task.run(JILorg/apache/spark/metrics/MetricsSystem;Lscala/collection/immutable/Map;Lscala/Option;)Ljava/lang/Object;+226 > j > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Lorg/apache/spark/executor/Executor$TaskRunner;Lscala/runtime/BooleanRef;)Ljava/lang/Object;+36 > j > org.apache.spark.executor.Executor$TaskRunner$$Lambda$2458.apply()Ljava/lang/Object;+8 > j > org.apache.spark.util.Utils$.tryWithSafeFinally(Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4 > j org.apache.spark.executor.Executor$TaskRunner.run()V+443 > j > java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V+95 > j java.util.concurrent.ThreadPoolExecutor$Worker.run()V+5 > j java.lang.Thread.run()V+11 > v ~StubRoutines::call_stub > V [libjvm.dylib+0x2d4795] JavaCalls::call_helper(JavaValue*, methodHandle*, > JavaCallArguments*, Thread*)+0x771 > V [libjvm.dylib+0x2d3460] JavaCalls::call_virtual(JavaValue*, KlassHandle, > Symbol*, Symbol*, JavaCallArguments*, Thread*)+0x14a > V [libjvm.dylib+0x2d367b] JavaCalls::call_virtual(JavaValue*, Handle, > KlassHandle, Symbol*, Symbol*, Thread*)+0x57 > V [libjvm.dylib+0x341cf5] thread_entry(JavaThread*, Thread*)+0x78 > V [libjvm.dylib+0x564226] JavaThread::thread_main_inner()+0x82 > V [libjvm.dylib+0x5640eb] JavaThread::run()+0x19d > V [libjvm.dylib+0x48f359] java_start(Thread*)+0xfa > C [libsystem_pthread.dylib+0x61d3] _pthread_start+0x7d > C [libsystem_pthread.dylib+0x1bd3] thread_start+0xf > C 0x0000000000000000{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org