I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the
aws-sdk-java latest libraries directly in flink lib dir.
I have come to the point where I get the java.net.SocketException: Broken pipe
(Write failed). Eager to get a reply and a clue on this!
java.io.IOException: Error sending data back to client
(ip-172-31-42-238/172.31.42.238:42753)
at
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException:
Broken pipe (Write failed)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)
at
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)
... 6 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
... 17 more
Thanks
From: Sathi Chowdhury <[email protected]>
Date: Friday, April 14, 2017 at 1:29 AM
To: "[email protected]" <[email protected]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()
I am consistently seeing the same behavior…tried with elevated memory for job
manager and taskmanager
taskmanager.rpc.port: 6123
taskmanager.data.port: 4964
taskmanager.heap.mb: 39000
taskmanager.numberOfTaskSlots: 1
taskmanager.network.numberOfBuffers: 16368
taskmanager.memory.preallocate: false
parallelization.degree.default: 4
even though the jobmanager is restarting the flink job ,but the subtasks once
reach to cancelled state does not revive
I have no clue how to approach this.
Thanks
From: Sathi Chowdhury <[email protected]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[email protected]>, "[email protected]"
<[email protected]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()
In my jobmanager log I see this exception , probably is the root cause why the
whole job is killed…is there any memory problem in jobmanager ? any clue for
this error below?
I ran the yarn-session
And my flink-conf.yaml is pretty much unmodified
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
invoke of yarn-session.sh
HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1
-tm 2048
Thanks
The error in job manager is as below
2017-04-14 04:26:33,570 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map ->
Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to
FAILED.
java.lang.RuntimeException: Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)
at
com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error sending data back to client
(ip-172-31-31-172/172.31.31.172:46147)
at
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)
... 12 more
Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException:
Connection reset
at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)
at
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)
... 14 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
... 25 more
2017-04-14 04:26:33,572 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink
Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to
FAILING.
2017-04-14 04:49:10,707 WARN akka.remote.ReliableDeliverySupervisor
- Association with remote system
[akka.tcp://[email protected]:37737] has
failed, address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://[email protected]:37737]] Caused
by: [Connection refused:
ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]
2017-04-14 04:49:10,712 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to
RUNNING.
2017-04-14 04:49:10,713 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to
RUNNING.
2017-04-14 04:49:10,715 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to
RUNNING.
2017-04-14 04:49:10,717 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map
(1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.
2017-04-14 04:49:10,718 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map ->
Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING
to RUNNING.
2017-04-14 04:49:10,740 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map ->
Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to
FAILED.
java.io.IOException: Cannot connect to the client to send back the stream
at
org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:244)
at
org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)
... 6 more
From: Ted Yu <[email protected]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()
Here is the line where NPE was thrown:
mainThread.interrupt(); // the main thread may be sleeping for the
discovery interval
I wonder if runFetcher() encountered running being false - otherwise mainThread
should not have been null.
Looks like we should check whether mainThread is null when shutting down.
On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury
<[email protected]<mailto:[email protected]>> wrote:
The taskmanger log does not point a line in my code ..but it seems like the
error occurs while it is trying to fetch kinesis record inside connector jar
red sequence number 49572261908151269541343187919820576263466496304458235906
2017-04-13 23:28:23,470 INFO
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0
is seeding the fetcher with restored shard
KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId:
shardId-000000000009,HashKeyRange: {StartingHashKey:
306254130228844617117037146688591390310,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49572254078827945986407789245674345090539511066904232082,}}'}, starting state
set to the restored sequence number LATEST_SEQUENCE_NUM
2017-04-13 23:28:23,471 WARN
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error
while closing Kinesis data fetcher
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
2017-04-13 23:28:23,471 WARN
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error
while closing Kinesis data fetcher
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
2017-04-13 23:28:23,472 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b)
switched from CANCELING to CANCELED.
2017-04-13 23:28:23,471 INFO
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0
is seeding the fetcher with restored shard
KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId:
shardId-000000000006,HashKeyRange: {StartingHashKey:
204169420152563078078024764459060926873,EndingHashKey:
238197656844656924424362225202237748018},SequenceNumberRange:
{StartingSequenceNumber:
49572254078761043750812197376249737935721565982386290786,}}'}, starting state
set to the restored sequence number LATEST_SEQUENCE_NUM
2017-04-13 23:28:23,472 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for Source: Custom Source (1/1)
(8a7301a437cb2d052208ee42c994104b).
From: Sathi Chowdhury
<[email protected]<mailto:[email protected]>>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()
Hi Ted, Sorry for my big font earlier…was not intended ☺
I am on flink 1.2.0
I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the
fatjar I am running.
Followed this
http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.kidder.io%2F2017%2F02%2F15%2Fflink-kinesis-streaming-connector%2F&data=01%7C01%7C%7Cfc97be832d1a4062d75608d482cf675f%7C0d009d13c2cd47d891dd2ae838b00d4b%7C0&sdata=avPl4%2FU5DTjZW09Zby9CeUttUNpiGWH%2Bvnycy9PhUDA%3D&reserved=0>
From code I read a kinesis stream using
consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
"LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
"200");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");
DataStream<Map<String, Object>> stream = env.addSource(new
FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(),
consumerConfig));
While I push the json event to the Kinesis stream intermittently I see this NPE
and flink job fails
2017-04-14 00:31:54,672 WARN
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error
while closing Kinesis data fetcher
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Thanks
Sathi
From: Ted Yu <[email protected]<mailto:[email protected]>>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury
<[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()
Can you give us a bit more information ?
release of flink
snippet of your code
Thanks
=============Notice to Recipient: This e-mail transmission, and any documents,
files or previous e-mail messages attached to it may contain information that
is confidential or legally privileged, and intended for the use of the
individual or entity named above. If you are not the intended recipient, or a
person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure,
copying, printing, distribution or use of any of the information contained in
or attached to this transmission is STRICTLY PROHIBITED. If you have received
this transmission in error, please immediately notify the sender by telephone
or return e-mail and delete the original transmission and its attachments
without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents,
files or previous e-mail messages attached to it may contain information that
is confidential or legally privileged, and intended for the use of the
individual or entity named above. If you are not the intended recipient, or a
person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure,
copying, printing, distribution or use of any of the information contained in
or attached to this transmission is STRICTLY PROHIBITED. If you have received
this transmission in error, please immediately notify the sender by telephone
or return e-mail and delete the original transmission and its attachments
without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents,
files or previous e-mail messages attached to it may contain information that
is confidential or legally privileged, and intended for the use of the
individual or entity named above. If you are not the intended recipient, or a
person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure,
copying, printing, distribution or use of any of the information contained in
or attached to this transmission is STRICTLY PROHIBITED. If you have received
this transmission in error, please immediately notify the sender by telephone
or return e-mail and delete the original transmission and its attachments
without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents,
files or previous e-mail messages attached to it may contain information that
is confidential or legally privileged, and intended for the use of the
individual or entity named above. If you are not the intended recipient, or a
person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure,
copying, printing, distribution or use of any of the information contained in
or attached to this transmission is STRICTLY PROHIBITED. If you have received
this transmission in error, please immediately notify the sender by telephone
or return e-mail and delete the original transmission and its attachments
without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents,
files or previous e-mail messages attached to it may contain information that
is confidential or legally privileged, and intended for the use of the
individual or entity named above. If you are not the intended recipient, or a
person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure,
copying, printing, distribution or use of any of the information contained in
or attached to this transmission is STRICTLY PROHIBITED. If you have received
this transmission in error, please immediately notify the sender by telephone
or return e-mail and delete the original transmission and its attachments
without reading or saving in any manner. Thank you. =============