I am using the AvroToPojo Malhar operator in conjunction with
AvroFileInputOperator for converting the avro records to POJO. While doing
the testing for application's stability, I found that AvroToPojo opwerator
doesn't recover in case of failure and keeps throwing below exception. This
in turn makes the whole application unstable and hence to be killed

The field for which it throws error 'ActiveFieldInfo' is a static inner
class and I am not sure on what can be done to have the operator recover
itself without any trouble. 

Any pointers on this issue will be really helpful

2017-05-30 17:15:46,826 INFO  stram.StreamingContainerParent
(StreamingContainerParent.java:log(170)) - child msg: deploy request failed:
[OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=brdn2244.target.com]]],
OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
0,
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
no-arg constructor): com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
Serialization trace:
columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
        at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
        at 
com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
        at
com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
        at
com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
        at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
        at
com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
        at
com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
 context:
PTContainer[id=1(container_e21_1491404336779_1770158_01_000018),state=ACTIVE,operators=[PTOperator[id=2,name=fileReader$avroToPojo,state=PENDING_DEPLOY],
PTOperator[id=1,name=fileReader$fileReader,state=PENDING_DEPLOY]]]
2017-05-30 17:15:46,832 INFO  stram.StreamingContainerParent
(StreamingContainerParent.java:log(170)) - child msg:
java.lang.IllegalStateException: Deploy request failed:
[OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=brdn2244.target.com]]],
OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
0,
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
        at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:836)
        at
com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
        at
com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
(missing no-arg constructor):
com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
Serialization trace:
columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
        at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
        at 
com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
        at
com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
        at
com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
        at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
        ... 2 more


Regards
Vivek



--
View this message in context: 
http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Reply via email to