+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。

Best Regards,
Ran Tao


Weihua Hu <huweihua....@gmail.com> 于2023年3月8日周三 16:52写道:

> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote:
>
> > Hi
> >   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> >   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> > at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> > not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> > reason: Checkpoint was declined.
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> > ... 22 more
> > Caused by: java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> > at
> >
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> > at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > at
> >
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> > ... 33 more
> >
> > 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> > COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF
>  NODE
> > NAME
> > java    168258 yarn *648u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *652r     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *653w     FIFO                0,9       0t0
> 850723636
> > pipe
> > java    168258 yarn *654u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *655r     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *656w     FIFO                0,9       0t0
> 850723637
> > pipe
> > java    168258 yarn *657u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *658r     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *659w     FIFO                0,9       0t0
> 850723638
> > pipe
> > java    168258 yarn *660u  a_inode               0,10         0
>  8670
> > [eventpoll]
> > java    168258 yarn *661w     FIFO                0,9       0t0
> 850723639
> > pipe
> >
> > flink版本:1.16.0
> > asynchbase版本:1.8.2
> > 附POM:
> >         <dependency>
> >             <groupId>org.hbase</groupId>
> >             <artifactId>asynchbase</artifactId>
> >             <version>1.8.2</version>
> >             <exclusions>
> >                 <exclusion>
> >                     <groupId>org.slf4j</groupId>
> >                     <artifactId>*</artifactId>
> >                 </exclusion>
> >             </exclusions>
> >         </dependency>
> >
> > 关键代码如下:
> >         SingleOutputStreamOperator asyncFunc = AsyncDataStream
> >                 .orderedWaitWithRetry(source, new
> > HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
> >                 .setParallelism(9)
> >                 .uid("asyncFunc");
> >
> > public class HbaseDimensionAsyncFunc extends
> > RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> > ArrayList<HashMap<String, String>>> {
> >     HBaseClient client = null;
> >
> >     @Override
> >     public void open(Configuration configuration) throws Exception {
> >         super.open(configuration);
> >         log.warn("==========创建hbase客户端==========");
> >         client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
> >     }
> >
> >     @Override
> >     public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> > String>>> o, ResultFuture<ArrayList<HashMap<String, String>>>
> resultFuture)
> > throws Exception {
> >
> >             client.get(new GetRequest(HBASE_TABLE,
> > uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
> >             // 业务代码
> >             });
> >     }
> > }
> >
> >
>

回复