Hi 错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么?
祝好 唐云 发自我的小米手机 在 eric <erickxi...@qq.com>,2019年4月30日 16:30写道: 大家好: 刚接触flink, 跑了个测试state checkpoint的程序: 1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会 2) 然后关闭数据源的socket,这时job会进入failed状态 3) 停几秒,把数据源socket重新打开 4) 这时flink会重连socket, 对job进行恢复,恢复时出错了, 存储的MapState没有成功恢复 环境: flink: 1.8.0 flink的hadoop包:flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar hdfs文件系统: hadoop2.6.0-cdh5.16.1 运行在standalone模式, state backend选fssystem或rocksdb都没成功 出错的log: Caused by: java.io<http://java.io>.IOException: Stream closed at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757) at java.io<http://java.io>.FilterInputStream.read(FilterInputStream.java:83) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41) at java.io<http://java.io>.DataInputStream.readUnsignedByte(DataInputStream.java:288) at org.apache.flink.types.StringValue.readString(StringValue.java:769) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)