Here is the line where NPE was thrown:

    mainThread.interrupt(); // the main thread may be sleeping for the
discovery interval

I wonder if runFetcher() encountered running being false - otherwise
mainThread should not have been null.

Looks like we should check whether mainThread is null when shutting down.

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <
sathi.chowdh...@elliemae.com> wrote:

> The taskmanger log does not point a line in my code ..but it seems like
> the error occurs while it is trying to fetch kinesis record inside
> connector jar
>
>
>
> red sequence number 495722619081512695413431879198
> 20576263466496304458235906
>
> 2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.
> connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the
> fetcher with restored shard KinesisStreamShard{streamName=
> 'dev-ingest-kinesis-us-west-2', shard='{ShardId: 
> shardId-000000000009,HashKeyRange:
> {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey:
> 340282366920938463463374607431768211455},SequenceNumberRange:
> {StartingSequenceNumber: 495722540788279459864077892456
> 74345090539511066904232082,}}'}, starting state set to the restored
> sequence number LATEST_SEQUENCE_NUM
>
> 2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.
> connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis
> data fetcher
>
> java.lang.NullPointerException
>
>         at org.apache.flink.streaming.connectors.kinesis.internals.
> KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
>
>         at org.apache.flink.streaming.connectors.kinesis.
> FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
>
>         at org.apache.flink.streaming.connectors.kinesis.
> FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
>
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:43)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:442)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:343)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> 2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.
> connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis
> data fetcher
>
> java.lang.NullPointerException
>
>         at org.apache.flink.streaming.connectors.kinesis.internals.
> KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
>
>         at org.apache.flink.streaming.connectors.kinesis.
> FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
>
>         at org.apache.flink.streaming.connectors.kinesis.
> FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
>
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:43)
>
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:442)
>
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:343)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> 2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task
>                 - Source: Custom Source (1/1) (
> 8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.
>
> 2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.
> connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the
> fetcher with restored shard KinesisStreamShard{streamName=
> 'dev-ingest-kinesis-us-west-2', shard='{ShardId: 
> shardId-000000000006,HashKeyRange:
> {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey:
> 238197656844656924424362225202237748018},SequenceNumberRange:
> {StartingSequenceNumber: 495722540787610437508121973762
> 49737935721565982386290786,}}'}, starting state set to the restored
> sequence number LATEST_SEQUENCE_NUM
>
> 2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Freeing task resources for Source:
> Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).
>
>
>
>
>
> *From: *Sathi Chowdhury <sathi.chowdh...@elliemae.com>
> *Date: *Thursday, April 13, 2017 at 5:44 PM
> *To: *Ted Yu <yuzhih...@gmail.com>
>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Flink errors out and job fails--IOException from
> CollectSink.open()
>
>
>
> Hi Ted, Sorry for my big font earlier…was not intended J
>
>
>
> I am on flink 1.2.0
>
> I built flink-connector-kinesis_2.10-1.2.0.jar from source and included
> in the fatjar I am running.
>
> Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-
> connector/
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.kidder.io%2F2017%2F02%2F15%2Fflink-kinesis-streaming-connector%2F&data=01%7C01%7C%7Cfc97be832d1a4062d75608d482cf675f%7C0d009d13c2cd47d891dd2ae838b00d4b%7C0&sdata=avPl4%2FU5DTjZW09Zby9CeUttUNpiGWH%2Bvnycy9PhUDA%3D&reserved=0>
>
>
>
>
>
> From code I read a kinesis stream using
>
>
>
> consumerConfig.setProperty(ConsumerConfigConstants.*AWS_REGION*, region);
> consumerConfig.setProperty(ConsumerConfigConstants.
> *DEFAULT_STREAM_INITIAL_POSITION*, "LATEST");
> consumerConfig.setProperty(ConsumerConfigConstants.
> *AWS_CREDENTIALS_PROVIDER*, "AUTO");
> consumerConfig.setProperty(ConsumerConfigConstants.*SHARD_GETRECORDS_MAX*,
> "10");
> consumerConfig.setProperty(ConsumerConfigConstants.
> *SHARD_GETRECORDS_RETRIES*, "200");
> consumerConfig.setProperty(ConsumerConfigConstants.
> *SHARD_GETRECORDS_INTERVAL_MILLIS*, "2000");
> DataStream<Map<String, Object>> stream = env.addSource(new
> FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(),
> consumerConfig));
>
>
>
>
>
> While I push the json event to the Kinesis stream intermittently I see
> this NPE and flink job fails
>
>
>
> 2017-04-14 00:31:54,672 WARN  
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error 
> while closing Kinesis data fetcher
>
> java.lang.NullPointerException
>
>                at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
>
>                at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
>
>                at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
>
>                at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>
>                at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>
>                at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
>
>                at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
>
>                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>                at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks
>
> Sathi
>
>
>
>
>
>
>
> *From: *Ted Yu <yuzhih...@gmail.com>
> *Date: *Thursday, April 13, 2017 at 5:02 PM
> *To: *Sathi Chowdhury <sathi.chowdh...@elliemae.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Flink errors out and job fails--IOException from
> CollectSink.open()
>
>
>
> Can you give us a bit more information ?
>
>
>
> release of flink
>
> snippet of your code
>
>
>
> Thanks
>
> =============Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =============
> =============Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =============
>

Reply via email to