[ 
https://issues.apache.org/jira/browse/FLINK-22340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324710#comment-17324710
 ] 

jiamo commented on FLINK-22340:
-------------------------------

I copy the ThreasholdMeter for custom use. And got this NPE:

 
{code:java}
java.lang.NullPointerException at 
com.papayamobile.streaming.ThresholdMeter.getEventCountsRecentInterval(ThresholdMeter.java:67)
 at 
com.papayamobile.streaming.ThresholdMeter.checkAgainstThreshold(ThresholdMeter.java:56)
 at 
com.papayamobile.streaming.StreamingClickJob$ToClick.flatMap(StreamingClickJob.java:66)
 at 
com.papayamobile.streaming.StreamingClickJob$ToClick.flatMap(StreamingClickJob.java:42)
 at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:986)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:111)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:314)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:331)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:328)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:970)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:209)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:125)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.AdaptivePollingRecordPublisher.lambda$run$0(AdaptivePollingRecordPublisher.java:77)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:117)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.AdaptivePollingRecordPublisher.run(AdaptivePollingRecordPublisher.java:75)
 at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:113)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{code}
And jus use `ThreasholdMeter` in normal case:
{code:java}
   public static class ToClick extends RichFlatMapFunction<ObjectNode, Click> {
      public transient ThresholdMeter meter;
      public transient SimpleCounter counter;
      public MetricGroup group;

      @Override
      public void open(org.apache.flink.configuration.Configuration parameters) 
throws Exception {
         group = getRuntimeContext()
               .getMetricGroup().addGroup("Click");
         meter = group.meter("1seconds", new ThresholdMeter(30000, 
Duration.ofSeconds(1)));
         counter = group.counter("totalCount", new SimpleCounter());
      }


      @Override
      public void flatMap(ObjectNode object, Collector<Click> out) throws 
Exception {

          ObjectMapper mapper = new ObjectMapper();
          Click click = mapper.readValue(object.toString(), Click.class);
          meter.markEvent();
          counter.inc();
          meter.checkAgainstThreshold();
          out.collect(click);

      }
   }
{code}
 

 

> Concurrent access to ThresholdMeter may lead to NPE
> ---------------------------------------------------
>
>                 Key: FLINK-22340
>                 URL: https://issues.apache.org/jira/browse/FLINK-22340
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Xintong Song
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> It is reported on github [1] that NPE is thrown from ThreasholdMeter, which 
> is likely caused by concurrent accesses.
> [1] 
> https://github.com/apache/flink/commit/b9e576fb845b817d804da3d68471ff8a4723dcf3#commitcomment-49681105



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to