Scott Kidder created FLINK-4536:
-----------------------------------
Summary: Possible thread leak in Task Manager
Key: FLINK-4536
URL: https://issues.apache.org/jira/browse/FLINK-4536
Project: Flink
Issue Type: Bug
Components: TaskManager
Affects Versions: 1.1.0
Reporter: Scott Kidder
Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
Job Manager
2 x Task Manager (2 CPU cores on each Task Manager)
I've also updated the Kinesis source to use the latest AWS Java SDK, release
1.11.29.
I've got a single Flink application using all 4 slots. It consumes from a
Kinesis stream configured with 2 shards. I've limited the Kinesis source to a
parallelism of 2 as a workaround for FLINK-4341.
Occasionally the Kinesis consumer fails because of provisioned-throughput
limits being hit. The application automatically restarts, and resumes
processing with the checkpoint stored on the Job Manager with no outward
indication of problems.
I recently enabled the StatsD metrics reporter in Flink and noticed that the
number of threads running on each Task Manager increases by about 20 threads
each time the application restarts. Over the course of a day the application
might hit provisioned-throughput limits 20 times or so (this is not fully
production yet, so hitting these limits is acceptable for now). But the number
of threads continues to grow unbounded with no increase in workload on the Task
Managers.
The following link includes charts for the overall Flink cluster performance &
Task Manager JVM threads over the course of 12 hours:
http://imgur.com/a/K59hz
Each decrease and subsequent spike in threads corresponds to the job being
restarted due to an AWS Kinesis source error.
Here are the logs from one of the Task Manager instances on startup:
{code}
2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader
- Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2016-08-30 14:52:50,540 INFO org.apache.flink.runtime.taskmanager.TaskManager
-
--------------------------------------------------------------------------------
2016-08-30 14:52:50,540 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager (Version: 1.1.1, Rev:61bfb36,
Date:09.08.2016 @ 12:09:08 UTC)
2016-08-30 14:52:50,540 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Current user: root
2016-08-30 14:52:50,541 INFO org.apache.flink.runtime.taskmanager.TaskManager
- JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
1.8/25.92-b14
2016-08-30 14:52:50,541 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Maximum heap size: 2048 MiBytes
2016-08-30 14:52:50,541 INFO org.apache.flink.runtime.taskmanager.TaskManager
- JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Hadoop version: 2.7.2
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- JVM Options:
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- -XX:+UseG1GC
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- -Xms2048M
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- -Xmx2048M
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
- -XX:MaxDirectMemorySize=8388607T
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
-
-Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
-
-Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
2016-08-30 14:52:50,543 INFO org.apache.flink.runtime.taskmanager.TaskManager
-
-Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Program Arguments:
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- --configDir
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- /usr/local/flink-1.1.1/conf
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Classpath:
/usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
-
--------------------------------------------------------------------------------
2016-08-30 14:52:50,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Registered UNIX signal handlers for [TERM, HUP, INT]
2016-08-30 14:52:50,547 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Maximum number of open file descriptors is 1048576
2016-08-30 14:52:50,565 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Loading configuration from /usr/local/flink-1.1.1/conf
2016-08-30 14:52:50,610 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Security is not enabled. Starting non-authenticated TaskManager.
2016-08-30 14:52:50,610 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Using configured hostname/address for TaskManager: 10.55.2.218
2016-08-30 14:52:50,611 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager
2016-08-30 14:52:50,615 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor system at 10.55.2.218:0
2016-08-30 14:52:50,956 INFO akka.event.slf4j.Slf4jLogger
- Slf4jLogger started
2016-08-30 14:52:51,005 INFO Remoting
- Starting remoting
2016-08-30 14:52:51,159 INFO Remoting
- Remoting started; listening on addresses
:[akka.tcp://[email protected]:44007]
2016-08-30 14:52:51,163 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor
2016-08-30 14:52:51,177 INFO
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig
[server address: /10.55.2.218, server port: 36007, memory segment size (bytes):
32768, transport type: NIO, number of server threads: 2 (manual), number of
client threads: 2 (manual), server connect backlog: 0 (use Netty's default),
client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use
Netty's default)]
2016-08-30 14:52:51,179 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Messages between TaskManager and JobManager have a max timeout of
10000 milliseconds
2016-08-30 14:52:51,183 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Temporary file directory '/tmp': total 9 GB, usable 8 GB (88.89%
usable)
2016-08-30 14:52:51,227 INFO
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB
for network buffer pool (number of memory segments: 2048, bytes per segment:
32768).
2016-08-30 14:52:51,287 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Limiting managed memory to 0.7 of the currently free heap space
(1377 MB), memory will be allocated lazily.
2016-08-30 14:52:51,309 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
uses directory /tmp/flink-io-c000b71a-d793-4f4c-90c4-0c0808154219 for spill
files.
2016-08-30 14:52:51,325 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/tmp/flink-dist-cache-95fd32d3-a629-42ce-816e-ca8a63e13c7d
2016-08-30 14:52:51,529 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor at
akka://flink/user/taskmanager#496823118.
2016-08-30 14:52:51,529 INFO org.apache.flink.runtime.taskmanager.TaskManager
- TaskManager data connection information:
ip-10-55-2-218.ec2.internal (dataPort=36007)
2016-08-30 14:52:51,530 INFO org.apache.flink.runtime.taskmanager.TaskManager
- TaskManager has 2 task slot(s).
2016-08-30 14:52:51,531 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Memory usage stats: [HEAP: 93/2048/2048 MB, NON HEAP: 31/32/-1 MB
(used/committed/max)]
2016-08-30 14:52:51,534 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Trying to register at JobManager
akka.tcp://[email protected]:6123/user/jobmanager (attempt 1, timeout: 500
milliseconds)
2016-08-30 14:52:51,690 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Successful registration at JobManager
(akka.tcp://[email protected]:6123/user/jobmanager), starting network stack and
library cache.
2016-08-30 14:52:51,841 INFO
org.apache.flink.runtime.io.network.netty.NettyClient - Successful
initialization (took 28 ms).
2016-08-30 14:52:51,871 INFO
org.apache.flink.runtime.io.network.netty.NettyServer - Successful
initialization (took 30 ms). Listening on SocketAddress /10.55.2.218:36007.
2016-08-30 14:52:51,872 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Determined BLOB server address to be /10.55.2.212:55892. Starting
BLOB cache.
2016-08-30 14:52:51,874 INFO org.apache.flink.runtime.blob.BlobCache
- Created BLOB cache storage directory
/tmp/blobStore-81d7d33f-a815-4f0b-9131-d3ba5a256d1b
2016-08-30 14:52:51,889 INFO org.apache.flink.metrics.statsd.StatsDReporter
- Starting StatsDReporter to send metric reports to
localhost/127.0.0.1:8125
2016-08-30 14:52:51,891 INFO org.apache.flink.runtime.metrics.MetricRegistry
- Periodically reporting metrics in intervals of 10 SECONDS for
reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
{code}
The Kinesis reader errors look like:
{code}
2016-08-30 17:23:43,353 WARN
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
ProvisionedThroughputExceededException. Backing off for 53 millis.
2016-08-30 17:23:43,725 WARN
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
ProvisionedThroughputExceededException. Backing off for 597 millis.
2016-08-30 17:23:44,805 WARN
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
ProvisionedThroughputExceededException. Backing off for 538 millis.
2016-08-30 17:23:45,344 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Shutting down the shard consumer threads of subtask 0 ...
2016-08-30 17:23:45,394 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
is shutting down.
2016-08-30 17:23:45,395 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
Shutting down the shard consumer threads of subtask 0 ...
2016-08-30 17:23:45,395 ERROR org.apache.flink.runtime.taskmanager.Task
- Task execution failed.
java.lang.RuntimeException: Rate Exceeded for getRecords operation - all
3retryattempts returned ProvisionedThroughputExceededException.
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Kinesis (1/2) switched to FAILED with exception.
java.lang.RuntimeException: Rate Exceeded for getRecords operation - all
3retryattempts returned ProvisionedThroughputExceededException.
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for Source: Kinesis (1/2)
2016-08-30 17:23:45,396 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
is shutting down.
2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Un-registering task and sending final execution state FAILED to
JobManager for task Source: Kinesis (c1bf869b9d8506cea67b1317b21c014e)
2016-08-30 17:23:45,396 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
is shutting down.
2016-08-30 17:23:45,402 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Discarding the results produced by task execution
c1bf869b9d8506cea67b1317b21c014e
2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Parse -> Timestamp and Watermark ->
(Map, Map, Map, Map) (1/4)
2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
- Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) (1/4)
switched to CANCELING
2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Parse -> Timestamp and
Watermark -> (Map, Map, Map, Map) (1/4) (70809fea05a6f67ffd7672bbe5b9643a).
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Parse -> Timestamp and Watermark ->
(Map, Map, Map, Map) (3/4)
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) (3/4)
switched to CANCELING
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Parse -> Timestamp and
Watermark -> (Map, Map, Map, Map) (3/4) (a70eafedd01af499a4ee066108e92ae8).
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Fold: property_id, video_id -> 1-minute
Fixed-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Fold: property_id, video_id -> 1-minute Fixed-Window Percentile
Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Fold: property_id, video_id
-> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
(b71730eb5cd484cde0eb8332e69d443e).
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Fold: property_id, video_id -> 1-minute
Fixed-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Fold: property_id, video_id -> 1-minute Fixed-Window Percentile
Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Fold: property_id, video_id
-> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
(6115f675c4d36004d4c885c9868d6b61).
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Fold: property_id, video_id -> 5-minute
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Fold: property_id, video_id -> 5-minute Sliding-Window Percentile
Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Fold: property_id, video_id
-> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
(d9cb8238533083dc2396e90cae3e702e).
2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Fold: property_id, video_id -> 5-minute
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
- Fold: property_id, video_id -> 5-minute Sliding-Window Percentile
Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Fold: property_id, video_id
-> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
(311ffa604444b33985572fe99170f405).
2016-08-30 17:23:45,406 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
is shutting down.
2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Fold: property_id, video_id ->
10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task
- Fold: property_id, video_id -> 10-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Fold: property_id, video_id
-> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
(556a4e6d2a4b7686539b50ad6b5f0d0d).
2016-08-30 17:23:45,420 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
is shutting down.
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)