boshu Zheng created FLINK-11083: ----------------------------------- Summary: CRowSerializerConfigSnapshot is not instantiable Key: FLINK-11083 URL: https://issues.apache.org/jira/browse/FLINK-11083 Project: Flink Issue Type: Bug Components: Table API & SQL, Type Serialization System Reporter: boshu Zheng Assignee: boshu Zheng
An exception was encountered when restarting a job with savepoint in our production env, {code:java} 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) ... 5 more Caused by: java.lang.RuntimeException: The class 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' is not instantiable: The class has no (implicit) public nullary constructor, i.e. a constructor without arguments. at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more {code} I add tests to CRowSerializerTest to make sure this is definitely a bug, {code:java} @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() /////// This would fail the test val serializerConfigSnapshotClass = Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") InstantiationUtil.instantiate(serializerConfigSnapshotClass) } @Test def testStateRestore(): Unit = { class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] { var state: ListState[CRow] = _ override def open(parameters: Configuration): Unit = { val stateDesc = new ListStateDescriptor[CRow]("CRow", new CRowTypeInfo(new RowTypeInfo(Types.INT))) state = getRuntimeContext.getListState(stateDesc) } override def processElement(value: Integer, ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context, out: Collector[Integer]): Unit = { state.add(new CRow(Row.of(value), true)) } } val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction) var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( operator, new KeySelector[Integer, Integer] { override def getKey(value: Integer): Integer= -1 }, Types.INT, 1, 1, 0) testHarness.setup() testHarness.open() testHarness.processElement(new StreamRecord[Integer](1, 1L)) testHarness.processElement(new StreamRecord[Integer](2, 1L)) testHarness.processElement(new StreamRecord[Integer](3, 1L)) assertEquals(1, numKeyedStateEntries(operator)) val snapshot = testHarness.snapshot(0L, 0L) testHarness.close() testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( operator, new KeySelector[Integer, Integer] { override def getKey(value: Integer): Integer= -1 }, Types.INT, 1, 1, 0) testHarness.setup() /////// This would throw the same exception as our production app do. testHarness.initializeState(snapshot) testHarness.open() assertEquals(1, numKeyedStateEntries(operator)) testHarness.close() } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)