[ https://issues.apache.org/jira/browse/FLINK-22340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324710#comment-17324710 ]
jiamo commented on FLINK-22340: ------------------------------- I copy the ThreasholdMeter for custom use. And got this NPE: {code:java} java.lang.NullPointerException at com.papayamobile.streaming.ThresholdMeter.getEventCountsRecentInterval(ThresholdMeter.java:67) at com.papayamobile.streaming.ThresholdMeter.checkAgainstThreshold(ThresholdMeter.java:56) at com.papayamobile.streaming.StreamingClickJob$ToClick.flatMap(StreamingClickJob.java:66) at com.papayamobile.streaming.StreamingClickJob$ToClick.flatMap(StreamingClickJob.java:42) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:986) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:111) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:314) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:331) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:328) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:970) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:209) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:125) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.AdaptivePollingRecordPublisher.lambda$run$0(AdaptivePollingRecordPublisher.java:77) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:117) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.AdaptivePollingRecordPublisher.run(AdaptivePollingRecordPublisher.java:75) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:113) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} And jus use `ThreasholdMeter` in normal case: {code:java} public static class ToClick extends RichFlatMapFunction<ObjectNode, Click> { public transient ThresholdMeter meter; public transient SimpleCounter counter; public MetricGroup group; @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { group = getRuntimeContext() .getMetricGroup().addGroup("Click"); meter = group.meter("1seconds", new ThresholdMeter(30000, Duration.ofSeconds(1))); counter = group.counter("totalCount", new SimpleCounter()); } @Override public void flatMap(ObjectNode object, Collector<Click> out) throws Exception { ObjectMapper mapper = new ObjectMapper(); Click click = mapper.readValue(object.toString(), Click.class); meter.markEvent(); counter.inc(); meter.checkAgainstThreshold(); out.collect(click); } } {code} > Concurrent access to ThresholdMeter may lead to NPE > --------------------------------------------------- > > Key: FLINK-22340 > URL: https://issues.apache.org/jira/browse/FLINK-22340 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.13.0 > Reporter: Xintong Song > Priority: Critical > Fix For: 1.13.0 > > > It is reported on github [1] that NPE is thrown from ThreasholdMeter, which > is likely caused by concurrent accesses. > [1] > https://github.com/apache/flink/commit/b9e576fb845b817d804da3d68471ff8a4723dcf3#commitcomment-49681105 -- This message was sent by Atlassian Jira (v8.3.4#803005)