On further investigation, seems to me the I/O exception I posted previously
is not the cause of the TM being lost. it's the after effect of the TM being
shut down and the channel being closed after a record is emitted but before
it's processed.

Previously, the logs didn't throw up this error and I'm also unable to
reproduce it each time(I've come across the I/O exception twice so far).
Most of the time, the logs don't have the I/O or any other exception/error
messages. 

This is what the logs usually(without the I/O exception) look like:
Job Manager:
/
2017-10-12 22:22:41,857 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Container container_1507845873691_0001_01_000008 failed. Exit status: -100
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Diagnostics for container container_1507845873691_0001_01_000008 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Total number of failed containers so far: 1
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 22:22:42,096 INFO  org.apache.flink.yarn.YarnJobManager              
           
- Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager
terminated.
2017-10-12 22:22:42,210 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,451 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,907 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (1/96) (8cf2869e9786809d1b9b9d12b9467e40)
switched from RUNNING to CANCELING.
/
 
Task Manager:
/
2017-10-12 22:22:38,570 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 5973/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3150, Total Capacity: 17138363, Used Memory: 17138364
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 6101/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3234, Total Capacity: 17139035, Used Memory: 17139036
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
*2017-10-12 22:22:38,709 INFO  org.apache.flink.yarn.YarnTaskManagerRunner      
            
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 22:22:38,713 INFO  org.apache.flink.runtime.blob.BlobCache           
           
- Shutting down BlobCache
2017-10-12 22:22:38,719 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-a5aace05-73ed-4cea-ad07-db86f9f8ce21
2017-10-12 22:22:38,719 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt1/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-cb34ffbe-879f-47d4-9df3-6ed2b0dcd799
/





This is what the logs sometimes(with the I/O exception) look like:
Job Manager:
/
2017-10-12 19:40:37,669 WARN  akka.remote.ReliableDeliverySupervisor            
           
- Association with remote system [akka.tcp://flink@ip-172-31-11-129:43340]
has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Container container_1507836035753_0001_01_000015 failed. Exit status: -100
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Diagnostics for container container_1507836035753_0001_01_000015 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Total number of failed containers so far: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Container container_1507836035753_0001_01_000013 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Diagnostics for container container_1507836035753_0001_01_000013 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Total number of failed containers so far: 2
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Container container_1507836035753_0001_01_000002 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Diagnostics for container container_1507836035753_0001_01_000002 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Total number of failed containers so far: 3
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Container container_1507836035753_0001_01_000003 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Diagnostics for container container_1507836035753_0001_01_000003 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Total number of failed containers so far: 4
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 2
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 3
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnJobManager              
           
- Task manager akka.tcp://flink@ip-172-31-1-178:33620/user/taskmanager
terminated.
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
           
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 4
2017-10-12 19:40:37,925 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:87)) (40/136) (748d815623ff13e6357f351d5aa7b0f4)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 19:40:37,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(11771c44eace0a1e32de1c3ca1c60b09) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
/

Task Manager:
/
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1347/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 162, Total Capacity: 17111387, Used Memory: 17111388
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1467/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 196, Total Capacity: 17111659, Used Memory: 17111660
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
*2017-10-12 19:40:35,033 INFO  org.apache.flink.yarn.YarnTaskManagerRunner      
            
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 19:40:35,043 ERROR org.apache.flink.runtime.operators.BatchTask      
           
- Error in task code:  CHAIN Partition -> FlatMap (FlatMap at
main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87))
(86/136)
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
        at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
        at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
        at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
        at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
        at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
        at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
        at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 13 more
2017-10-12 19:40:35,050 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507836035753_0001/flink-io-81d98a3a-7a40-438f-93fa-3b1f9dfc1e1d
/

I still can't figure out why the TM shuts down and how to avoid this at all
- seems like a memory/GC issue. I was able to have the job complete
previously by increasing parallelism(number of task managers). But as my
dataset size has increases, I'm running into this issue again and increasing
parallelism is not working. 

Any help would be greatly appreciated! 

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to