boshu Zheng created FLINK-11083:
-----------------------------------

             Summary: CRowSerializerConfigSnapshot is not instantiable
                 Key: FLINK-11083
                 URL: https://issues.apache.org/jira/browse/FLINK-11083
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL, Type Serialization System
            Reporter: boshu Zheng
            Assignee: boshu Zheng


An exception was encountered when restarting a job with savepoint in our 
production env,

{code:java}
2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task   
                :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
from any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
        ... 5 more
Caused by: java.lang.RuntimeException: The class 
'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
 is not instantiable: The class has no (implicit) public nullary constructor, 
i.e. a constructor without arguments.
        at 
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
        at 
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
        at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
        at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
        ... 7 more
{code}

I add tests to CRowSerializerTest to make sure this is definitely a bug,
{code:java}
  @Test
  def testDefaultConstructor(): Unit = {
    new CRowSerializer.CRowSerializerConfigSnapshot()

    /////// This would fail the test
    val serializerConfigSnapshotClass =
     
Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
    InstantiationUtil.instantiate(serializerConfigSnapshotClass)
  }

  @Test
  def testStateRestore(): Unit = {

    class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, 
Integer] {
      var state: ListState[CRow] = _
      override def open(parameters: Configuration): Unit = {
        val stateDesc = new ListStateDescriptor[CRow]("CRow",
          new CRowTypeInfo(new RowTypeInfo(Types.INT)))
        state = getRuntimeContext.getListState(stateDesc)
      }
      override def processElement(value: Integer,
          ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
          out: Collector[Integer]): Unit = {
        state.add(new CRow(Row.of(value), true))
      }
    }

    val operator = new KeyedProcessOperator[Integer, Integer, Integer](new 
IKeyedProcessFunction)

    var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
Integer, Integer](
      operator,
      new KeySelector[Integer, Integer] {
        override def getKey(value: Integer): Integer= -1
      },
      Types.INT, 1, 1, 0)
    testHarness.setup()
    testHarness.open()
    testHarness.processElement(new StreamRecord[Integer](1, 1L))
    testHarness.processElement(new StreamRecord[Integer](2, 1L))
    testHarness.processElement(new StreamRecord[Integer](3, 1L))

    assertEquals(1, numKeyedStateEntries(operator))

    val snapshot = testHarness.snapshot(0L, 0L)
    testHarness.close()

    testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, 
Integer](
      operator,
      new KeySelector[Integer, Integer] {
        override def getKey(value: Integer): Integer= -1
      },
      Types.INT, 1, 1, 0)
    testHarness.setup()

    /////// This would throw the same exception as our production app do.
    testHarness.initializeState(snapshot)

    testHarness.open()

    assertEquals(1, numKeyedStateEntries(operator))

    testHarness.close()
  }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to