+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。
Best Regards, Ran Tao Weihua Hu <huweihua....@gmail.com> 于2023年3月8日周三 16:52写道: > Hi, > > 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。 > > 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。 > > Best, > Weihua > > > On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote: > > > Hi > > 我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下: > > 2023-03-08 16:15:39 > > org.jboss.netty.channel.ChannelException: Failed to create a selector. > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343) > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100) > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52) > > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) > > at > > > org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) > > at > > > org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143) > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81) > > at > > > org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) > > at > org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707) > > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507) > > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496) > > at > > > com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > > at > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) > > at > > > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214) > > at > > > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > > at > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.IOException: Too many open files > > at sun.nio.ch.IOUtil.makePipe(Native Method) > > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) > > at sun.nio.ch > > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) > > at java.nio.channels.Selector.open(Selector.java:227) > > at > > > org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63) > > at > > > org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341) > > ... 25 more > > 对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下 > > java.io.IOException: Could not perform checkpoint 5 for operator async > > wait operator (2/9)#0. > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > > at > > > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > > at org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > > at org.apache.flink.streaming.runtime.io > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > > at > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could > > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure > > reason: Checkpoint was declined. > > at > > > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269) > > at > > > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345) > > at > > > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) > > at > > > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) > > at > > > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) > > at > > > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726) > > at > > > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226) > > ... 22 more > > Caused by: java.util.ConcurrentModificationException > > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442) > > at java.util.HashMap$EntryIterator.next(HashMap.java:1476) > > at java.util.HashMap$EntryIterator.next(HashMap.java:1474) > > at > > > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) > > at > > > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) > > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > > at > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131) > > at > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22) > > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > > at > > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308) > > at > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) > > at > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46) > > at > > > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75) > > at > > > org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65) > > at > > > org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79) > > at > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77) > > at > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36) > > at > > > org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) > > at > > > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230) > > at > > > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230) > > ... 33 more > > > > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放 > > COMMAND PID USER FD TYPE DEVICE SIZE/OFF > NODE > > NAME > > java 168258 yarn *648u a_inode 0,10 0 > 8670 > > [eventpoll] > > java 168258 yarn *652r FIFO 0,9 0t0 > 850723636 > > pipe > > java 168258 yarn *653w FIFO 0,9 0t0 > 850723636 > > pipe > > java 168258 yarn *654u a_inode 0,10 0 > 8670 > > [eventpoll] > > java 168258 yarn *655r FIFO 0,9 0t0 > 850723637 > > pipe > > java 168258 yarn *656w FIFO 0,9 0t0 > 850723637 > > pipe > > java 168258 yarn *657u a_inode 0,10 0 > 8670 > > [eventpoll] > > java 168258 yarn *658r FIFO 0,9 0t0 > 850723638 > > pipe > > java 168258 yarn *659w FIFO 0,9 0t0 > 850723638 > > pipe > > java 168258 yarn *660u a_inode 0,10 0 > 8670 > > [eventpoll] > > java 168258 yarn *661w FIFO 0,9 0t0 > 850723639 > > pipe > > > > flink版本:1.16.0 > > asynchbase版本:1.8.2 > > 附POM: > > <dependency> > > <groupId>org.hbase</groupId> > > <artifactId>asynchbase</artifactId> > > <version>1.8.2</version> > > <exclusions> > > <exclusion> > > <groupId>org.slf4j</groupId> > > <artifactId>*</artifactId> > > </exclusion> > > </exclusions> > > </dependency> > > > > 关键代码如下: > > SingleOutputStreamOperator asyncFunc = AsyncDataStream > > .orderedWaitWithRetry(source, new > > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry()) > > .setParallelism(9) > > .uid("asyncFunc"); > > > > public class HbaseDimensionAsyncFunc extends > > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>, > > ArrayList<HashMap<String, String>>> { > > HBaseClient client = null; > > > > @Override > > public void open(Configuration configuration) throws Exception { > > super.open(configuration); > > log.warn("==========创建hbase客户端=========="); > > client = new HBaseClient(PropUtils.getValue("bigdata.hosts")); > > } > > > > @Override > > public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String, > > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> > resultFuture) > > throws Exception { > > > > client.get(new GetRequest(HBASE_TABLE, > > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> { > > // 业务代码 > > }); > > } > > } > > > > >