[ 
https://issues.apache.org/jira/browse/FLINK-21963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias updated FLINK-21963:
-----------------------------
    Description: 
[This 
build|https://dev.azure.com/mapohl/flink/_build/results?buildId=360&view=logs&j=e0582806-6d85-5dc5-7eb4-4289d3d0de6b&t=9fea6cf4-6ce3-5c26-d059-69f4d4cec7d1&l=4442]
 failed (not exclusively) due to 
{{ReactiveModelITCase.testScaleDownOnTaskManagerLoss}}.

I was able to reproduce it locally having the {{DefaultScheduler}} enabled. The 
test seems to get into an infinite loop:

{quote}
[...]
76125 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FAILED to JobManager for task Source: Custom 
Source -> Sink: Unnamed (4/4)#8738 92b920a905c55fc85a76c79b3acef161.
76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot externally (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Free reserved slot aec00279d7404b26a104ee906695d27a.
76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
from state Executing to Restarting.
76125 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state RUNNING to 
CANCELLING.
76125 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched from 
RUNNING to CANCELING.
76125 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched from 
RUNNING to CANCELING.
76125 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched from 
RUNNING to CANCELING.
76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 843d0c154f55a15a9bb1e705ae282032.
76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution ba0dd94db26abc376ee73522410b8094.
76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 92b920a905c55fc85a76c79b3acef161.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched from 
CANCELING to CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution #8738 to 
FAILED while being CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot externally (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Free reserved slot dde6780a1f8df3d0b1b1b454e28f8566.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
'newResourcesAvailable' because the actual state is Restarting and not 
ResourceConsumer.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
results produced by task execution ba0dd94db26abc376ee73522410b8094.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched from 
CANCELING to CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution #8739 to 
FAILED while being CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot externally (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Free reserved slot 25dd5bd3007772fe2cc69568cad2d882.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
'newResourcesAvailable' because the actual state is Restarting and not 
ResourceConsumer.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
results produced by task execution 843d0c154f55a15a9bb1e705ae282032.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched from 
CANCELING to CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution #8738 to 
FAILED while being CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot externally (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Free reserved slot 3315697ecf20a1249d7dad268892bcc9.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
shared slot (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
'newResourcesAvailable' because the actual state is Restarting and not 
ResourceConsumer.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state CANCELLING to 
CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ExecutionGraph 
4b5f437c7c47c8be9f8d8bf08e78910a reached terminal state CANCELED.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping 
checkpoint coordinator for job 4b5f437c7c47c8be9f8d8bf08e78910a.
76126 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
results produced by task execution 92b920a905c55fc85a76c79b3acef161.
76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution ba0dd94db26abc376ee73522410b8094.
76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 92b920a905c55fc85a76c79b3acef161.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Ignoring 
scheduled action because expected state 
org.apache.flink.runtime.scheduler.adaptive.Executing@480dd446 is not the 
actual state org.apache.flink.runtime.scheduler.adaptive.Restarting@64a59f58.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Declare new resource requirements for job 4b5f437c7c47c8be9f8d8bf08e78910a.
        required resources: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=32768}]
        acquired resources: 
ResourceCounter{resources={ResourceProfile{UNKNOWN}=4}}
76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 843d0c154f55a15a9bb1e705ae282032.
76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
from state Restarting to WaitingForResources.
76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution ba0dd94db26abc376ee73522410b8094.
76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 92b920a905c55fc85a76c79b3acef161.
76127 [SIGINT handler] WARN  org.apache.flink.util.TestSignalHandler [] - 
RECEIVED SIGNAL 2: SIGINT. Shutting down as requested.
76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
from state WaitingForResources to CreatingExecutionGraph.
76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 843d0c154f55a15a9bb1e705ae282032.
76127 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Running 
initialization on master for job Flink Streaming Job 
(4b5f437c7c47c8be9f8d8bf08e78910a).
76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution ba0dd94db26abc376ee73522410b8094.
76127 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Successfully 
ran initialization on master in 0 ms.
76127 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Adding 1 
vertices from job graph Flink Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a).
76127 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Attaching 1 
topologically sorted vertices to existing job graph with 0 vertices and 0 
intermediate results.
76127 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting 
ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> 
Sink: Unnamed) to 0 predecessors.
76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 92b920a905c55fc85a76c79b3acef161.
76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
stop for execution 843d0c154f55a15a9bb1e705ae282032.
76127 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
4 pipelined regions in 0 ms
76127 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Successfully 
created execution graph from job graph Flink Streaming Job 
(4b5f437c7c47c8be9f8d8bf08e78910a).
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'totalNumberOfCheckpoints'. Metric will not be reported.[localhost, jobmanager, 
Flink Streaming Job]
76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution #8738 to 
FAILED while being CANCELED.
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'numberOfInProgressCheckpoints'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'numberOfCompletedCheckpoints'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'numberOfFailedCheckpoints'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointRestoreTimestamp'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointSize'. Metric will not be reported.[localhost, jobmanager, Flink 
Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointDuration'. Metric will not be reported.[localhost, jobmanager, 
Flink Streaming Job]
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointProcessedData'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution #8738 to 
FAILED while being CANCELED.
76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointPersistedData'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'lastCheckpointExternalPath'. Metric will not be reported.[localhost, 
jobmanager, Flink Streaming Job]
76128 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - No state 
backend has been configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@90aee29
76128 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - The 
configuration state.checkpoint-storage has not be set in the current sessions 
flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
strongly encouraged explicitly set this configuration so they understand how 
their applications are checkpointing snapshots for fault-tolerance.
76128 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Checkpoint 
storage is set to JobManager
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition 
of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution #8739 to 
FAILED while being CANCELED.
76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 
'restartingTime'. Metric will not be reported.[localhost, jobmanager, Flink 
Streaming Job]
76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 'downtime'. 
Metric will not be reported.[localhost, jobmanager, Flink Streaming Job]
76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
[] - Name collision: Group already contains a Metric with the name 'uptime'. 
Metric will not be reported.[localhost, jobmanager, Flink Streaming Job]
76128 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Status of the 
shared state registry of job 4b5f437c7c47c8be9f8d8bf08e78910a after restore: 
SharedStateRegistry{registeredStates={}}.
76128 [jobmanager-future-thread-9] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint 
found during restore.
76128 [jobmanager-future-thread-9] DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Resetting the 
master hooks.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state CREATED to RUNNING.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
Reserve free slot with allocation id aec00279d7404b26a104ee906695d27a.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
Allocating logical slot from shared slot 
(SlotRequestId{88b040e446ee408f792334a2ec437a42})
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
Reserve free slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
Allocating logical slot from shared slot 
(SlotRequestId{96939e4bb685186000b4001d96082081})
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
Reserve free slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
Allocating logical slot from shared slot 
(SlotRequestId{c879c9aa7a7cb6dfbc12502ce7a8ed12})
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
Reserve free slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
Allocating logical slot from shared slot 
(SlotRequestId{53a25924bcd9fe2db23c22e0bf17effe})
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Successfully 
reserved and assigned the required slots for the ExecutionGraph.
76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
from state CreatingExecutionGraph to Executing.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched from 
CREATED to DEPLOYING.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> Sink: Unnamed (1/4) (attempt #8740) with attempt id 
9132ba5f6b087654fb351138ce74e710 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
localhost (dataPort=-1) with allocation id aec00279d7404b26a104ee906695d27a
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched from 
CREATED to DEPLOYING.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> Sink: Unnamed (2/4) (attempt #8740) with attempt id 
7384c32213a4e9cd3aa6ee5875b1e532 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
localhost (dataPort=-1) with allocation id 25dd5bd3007772fe2cc69568cad2d882
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched from 
CREATED to DEPLOYING.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> Sink: Unnamed (3/4) (attempt #8739) with attempt id 
497dd733deeb255e05bae82f0e41527d to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
localhost (dataPort=-1) with allocation id dde6780a1f8df3d0b1b1b454e28f8566
76128 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
aec00279d7404b26a104ee906695d27a.
76128 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
dde6780a1f8df3d0b1b1b454e28f8566.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched from 
CREATED to DEPLOYING.
76128 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> Sink: Unnamed (4/4) (attempt #8739) with attempt id 
639cf6da5847c7f4250839aeb2552df9 to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
localhost (dataPort=-1) with allocation id 3315697ecf20a1249d7dad268892bcc9
76128 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
cbc357ccb763df2852fee8c4fc7d55f2 - 2 under allocation id 
dde6780a1f8df3d0b1b1b454e28f8566: 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@6d90093b
76128 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
cbc357ccb763df2852fee8c4fc7d55f2 - 0 under allocation id 
aec00279d7404b26a104ee906695d27a: 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@64e96e04
76128 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> Sink: Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d), 
deploy into slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
76129 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3315697ecf20a1249d7dad268892bcc9.
76129 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> Sink: Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710), 
deploy into slot with allocation id aec00279d7404b26a104ee906695d27a.
76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from CREATED to 
DEPLOYING.
76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream leak 
safety net for task Source: Custom Source -> Sink: Unnamed (3/4)#8739 
(497dd733deeb255e05bae82f0e41527d) [DEPLOYING]
76129 [flink-akka.actor.default-dispatcher-2] DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
cbc357ccb763df2852fee8c4fc7d55f2 - 3 under allocation id 
3315697ecf20a1249d7dad268892bcc9: 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@23a8b928
76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from CREATED to 
DEPLOYING.
76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream leak 
safety net for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
(9132ba5f6b087654fb351138ce74e710) [DEPLOYING]
76130 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
25dd5bd3007772fe2cc69568cad2d882.
76130 [TransientBlobCache shutdown hook] INFO  
org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache
76130 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
Shutting down TaskExecutorLocalStateStoresManager.
76130 [flink-akka.actor.default-dispatcher-4] DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
cbc357ccb763df2852fee8c4fc7d55f2 - 1 under allocation id 
25dd5bd3007772fe2cc69568cad2d882: 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@389f2d5c
76130 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> Sink: Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532), 
deploy into slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
76130 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> Sink: Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9), 
deploy into slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
76131 [PermanentBlobCache shutdown hook] INFO  
org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from CREATED to 
DEPLOYING.
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
Source: Custom Source -> Sink: Unnamed (3/4)#8739 
(497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
Source: Custom Source -> Sink: Unnamed (1/4)#8740 
(9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream leak 
safety net for task Source: Custom Source -> Sink: Unnamed (4/4)#8739 
(639cf6da5847c7f4250839aeb2552df9) [DEPLOYING]
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
for task 497dd733deeb255e05bae82f0e41527d at library cache manager took 0 
milliseconds
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
for task 9132ba5f6b087654fb351138ce74e710 at library cache manager took 0 
milliseconds
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
Source: Custom Source -> Sink: Unnamed (4/4)#8739 
(639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
for task 639cf6da5847c7f4250839aeb2552df9 at library cache manager took 0 
milliseconds
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
Source: Custom Source -> Sink: Unnamed (1/4)#8740 
(9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
Source: Custom Source -> Sink: Unnamed (4/4)#8739 
(639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
Source: Custom Source -> Sink: Unnamed (3/4)#8739 
(497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
been configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7b20c610
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
been configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252878a9
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
been configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@30eb707b
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
state.checkpoint-storage has not be set in the current sessions 
flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
strongly encouraged explicitly set this configuration so they understand how 
their applications are checkpointing snapshots for fault-tolerance.
76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is 
set to JobManager
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
state.checkpoint-storage has not be set in the current sessions 
flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
strongly encouraged explicitly set this configuration so they understand how 
their applications are checkpointing snapshots for fault-tolerance.
76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is 
set to JobManager
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
state.checkpoint-storage has not be set in the current sessions 
flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
strongly encouraged explicitly set this configuration so they understand how 
their applications are checkpointing snapshots for fault-tolerance.
76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from CREATED to 
DEPLOYING.
76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is 
set to JobManager
76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream leak 
safety net for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
(7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING]
76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
Source: Custom Source -> Sink: Unnamed (2/4)#8740 
(7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
for task 7384c32213a4e9cd3aa6ee5875b1e532 at library cache manager took 0 
milliseconds
76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
Source: Custom Source -> Sink: Unnamed (2/4)#8740 
(7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
been configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c1406b8
76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
state.checkpoint-storage has not be set in the current sessions 
flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
strongly encouraged explicitly set this configuration so they understand how 
their applications are checkpointing snapshots for fault-tolerance.
76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is 
set to JobManager
76134 [FileChannelManagerImpl-io shutdown hook] INFO  
org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager 
removed spill file directory 
/var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-io-12fb17cf-712c-45f8-ab94-32fc7f5b5571
76136 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
Shutting down TaskExecutorLocalStateStoresManager.
76136 [IOManagerAsync shutdown hook] DEBUG 
org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
manager.
76136 [IOManagerAsync shutdown hook] DEBUG 
org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
manager.
76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from DEPLOYING 
to RUNNING.
76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from DEPLOYING 
to RUNNING.
76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
Custom Source -> Sink: Unnamed (4/4)#8739.
76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
Custom Source -> Sink: Unnamed (2/4)#8740.
76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from DEPLOYING 
to RUNNING.
76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from DEPLOYING 
to RUNNING.
76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
Custom Source -> Sink: Unnamed (1/4)#8740.
76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
Custom Source -> Sink: Unnamed (3/4)#8739.
76136 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched from 
DEPLOYING to RUNNING.
76136 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched from 
DEPLOYING to RUNNING.
76136 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched from 
DEPLOYING to RUNNING.
76136 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched from 
DEPLOYING to RUNNING.
76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
Custom Source -> Sink: Unnamed (3/4)#8739
76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
Custom Source -> Sink: Unnamed (4/4)#8739
76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
Custom Source -> Sink: Unnamed (1/4)#8740
76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for StreamSink_7df19f87deec5680128845fd9a6ca18d_(3/4) 
with empty state.
76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
Custom Source -> Sink: Unnamed (2/4)#8740
76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for StreamSink_7df19f87deec5680128845fd9a6ca18d_(1/4) 
with empty state.
76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for StreamSink_7df19f87deec5680128845fd9a6ca18d_(4/4) 
with empty state.
76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for StreamSink_7df19f87deec5680128845fd9a6ca18d_(2/4) 
with empty state.
76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] WARN  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from RUNNING to 
FAILED with failure cause: java.lang.RuntimeException: Test error. More 
instances than expected.
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)

76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] WARN  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from RUNNING to 
FAILED with failure cause: java.lang.RuntimeException: Test error. More 
instances than expected.
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)

76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] WARN  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from RUNNING to 
FAILED with failure cause: java.lang.RuntimeException: Test error. More 
instances than expected.
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)

76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source -> Sink: Unnamed (4/4)#8739 
(639cf6da5847c7f4250839aeb2552df9).
76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
Source -> Sink: Unnamed (4/4)#8739 network resources (state: FAILED).
76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source -> Sink: Unnamed (1/4)#8740 
(9132ba5f6b087654fb351138ce74e710).
76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source -> Sink: Unnamed (3/4)#8739 
(497dd733deeb255e05bae82f0e41527d).
76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
Source -> Sink: Unnamed (1/4)#8740 network resources (state: FAILED).
76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] WARN  
org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from RUNNING to 
FAILED with failure cause: java.lang.RuntimeException: Test error. More 
instances than expected.
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)

76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
Source -> Sink: Unnamed (3/4)#8739 network resources (state: FAILED).
76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
Source: Custom Source -> Sink: Unnamed (2/4)#8740 
(7384c32213a4e9cd3aa6ee5875b1e532).
76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
Source -> Sink: Unnamed (2/4)#8740 network resources (state: FAILED).
76137 [FileCache shutdown hook] INFO  
org.apache.flink.runtime.filecache.FileCache [] - removed file cache directory 
/var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-dist-cache-13391a9e-181b-4d3c-b373-0ac0203f301e
76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem streams 
are closed for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
(9132ba5f6b087654fb351138ce74e710) [FAILED]
76138 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem streams 
are closed for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
(7384c32213a4e9cd3aa6ee5875b1e532) [FAILED]
76138 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FAILED to JobManager for task Source: Custom 
Source -> Sink: Unnamed (1/4)#8740 9132ba5f6b087654fb351138ce74e710.
76138 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FAILED to JobManager for task Source: Custom 
Source -> Sink: Unnamed (2/4)#8740 7384c32213a4e9cd3aa6ee5875b1e532.
76138 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched from 
RUNNING to FAILED on 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ localhost 
(dataPort=-1).
java.lang.RuntimeException: Test error. More instances than expected.
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
 ~[test-classes/:?]
        at 
org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
 ~[test-classes/:?]
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[classes/:?]
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 ~[classes/:?]
        at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[classes/:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
[...]
{quote}

  was:
[This 
build|https://dev.azure.com/mapohl/flink/_build/results?buildId=360&view=logs&j=e0582806-6d85-5dc5-7eb4-4289d3d0de6b&t=9fea6cf4-6ce3-5c26-d059-69f4d4cec7d1&l=4442]
 failed (not exclusively) due to 
{{ReactiveModelITCase.testScaleDownOnTaskManagerLoss}}.

I wasn't able to reproduce it locally with default and adaptive scheduler 
enabled.


> ReactiveModelITCase.testScaleDownOnTaskManagerLoss failed
> ---------------------------------------------------------
>
>                 Key: FLINK-21963
>                 URL: https://issues.apache.org/jira/browse/FLINK-21963
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination, Tests
>            Reporter: Matthias
>            Priority: Major
>              Labels: test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=360&view=logs&j=e0582806-6d85-5dc5-7eb4-4289d3d0de6b&t=9fea6cf4-6ce3-5c26-d059-69f4d4cec7d1&l=4442]
>  failed (not exclusively) due to 
> {{ReactiveModelITCase.testScaleDownOnTaskManagerLoss}}.
> I was able to reproduce it locally having the {{DefaultScheduler}} enabled. 
> The test seems to get into an infinite loop:
> {quote}
> [...]
> 76125 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (4/4)#8738 92b920a905c55fc85a76c79b3acef161.
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot aec00279d7404b26a104ee906695d27a.
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state Executing to Restarting.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state RUNNING 
> to CANCELLING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched 
> from RUNNING to CANCELING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched 
> from RUNNING to CANCELING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched 
> from RUNNING to CANCELING.
> 76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot dde6780a1f8df3d0b1b1b454e28f8566.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution 
> #8739 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot 25dd5bd3007772fe2cc69568cad2d882.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot 3315697ecf20a1249d7dad268892bcc9.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state 
> CANCELLING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ExecutionGraph 
> 4b5f437c7c47c8be9f8d8bf08e78910a reached terminal state CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping 
> checkpoint coordinator for job 4b5f437c7c47c8be9f8d8bf08e78910a.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Ignoring 
> scheduled action because expected state 
> org.apache.flink.runtime.scheduler.adaptive.Executing@480dd446 is not the 
> actual state org.apache.flink.runtime.scheduler.adaptive.Restarting@64a59f58.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Declare new resource requirements for job 4b5f437c7c47c8be9f8d8bf08e78910a.
>       required resources: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=32768}]
>       acquired resources: 
> ResourceCounter{resources={ResourceProfile{UNKNOWN}=4}}
> 76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state Restarting to WaitingForResources.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76127 [SIGINT handler] WARN  org.apache.flink.util.TestSignalHandler [] - 
> RECEIVED SIGNAL 2: SIGINT. Shutting down as requested.
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state WaitingForResources to CreatingExecutionGraph.
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Running 
> initialization on master for job Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully ran initialization on master in 0 ms.
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Adding 1 
> vertices from job graph Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Attaching 1 
> topologically sorted vertices to existing job graph with 0 vertices and 0 
> intermediate results.
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting 
> ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> 
> Sink: Unnamed) to 0 predecessors.
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 4 pipelined regions in 0 ms
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully created execution graph from job graph Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfFailedCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointSize'. Metric will not be reported.[localhost, jobmanager, 
> Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointDuration'. Metric will not be reported.[localhost, jobmanager, 
> Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointProcessedData'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointPersistedData'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointExternalPath'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - No state 
> backend has been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@90aee29
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - The 
> configuration state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Checkpoint 
> storage is set to JobManager
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution 
> #8739 to FAILED while being CANCELED.
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'restartingTime'. Metric will not be reported.[localhost, jobmanager, Flink 
> Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'downtime'. Metric will not be reported.[localhost, jobmanager, Flink 
> Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 'uptime'. 
> Metric will not be reported.[localhost, jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Status of the 
> shared state registry of job 4b5f437c7c47c8be9f8d8bf08e78910a after restore: 
> SharedStateRegistry{registeredStates={}}.
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint 
> found during restore.
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Resetting the 
> master hooks.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state CREATED 
> to RUNNING.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id aec00279d7404b26a104ee906695d27a.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{88b040e446ee408f792334a2ec437a42})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{96939e4bb685186000b4001d96082081})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{c879c9aa7a7cb6dfbc12502ce7a8ed12})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{53a25924bcd9fe2db23c22e0bf17effe})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully reserved and assigned the required slots for the ExecutionGraph.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state CreatingExecutionGraph to Executing.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (1/4) (attempt #8740) with attempt id 
> 9132ba5f6b087654fb351138ce74e710 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
> localhost (dataPort=-1) with allocation id aec00279d7404b26a104ee906695d27a
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (2/4) (attempt #8740) with attempt id 
> 7384c32213a4e9cd3aa6ee5875b1e532 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
> localhost (dataPort=-1) with allocation id 25dd5bd3007772fe2cc69568cad2d882
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (3/4) (attempt #8739) with attempt id 
> 497dd733deeb255e05bae82f0e41527d to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
> localhost (dataPort=-1) with allocation id dde6780a1f8df3d0b1b1b454e28f8566
> 76128 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot aec00279d7404b26a104ee906695d27a.
> 76128 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot dde6780a1f8df3d0b1b1b454e28f8566.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (4/4) (attempt #8739) with attempt id 
> 639cf6da5847c7f4250839aeb2552df9 to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
> localhost (dataPort=-1) with allocation id 3315697ecf20a1249d7dad268892bcc9
> 76128 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 2 under allocation id 
> dde6780a1f8df3d0b1b1b454e28f8566: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@6d90093b
> 76128 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 0 under allocation id 
> aec00279d7404b26a104ee906695d27a: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@64e96e04
> 76128 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d), 
> deploy into slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
> 76129 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot 3315697ecf20a1249d7dad268892bcc9.
> 76129 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710), 
> deploy into slot with allocation id aec00279d7404b26a104ee906695d27a.
> 76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from CREATED 
> to DEPLOYING.
> 76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING]
> 76129 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 3 under allocation id 
> 3315697ecf20a1249d7dad268892bcc9: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@23a8b928
> 76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from CREATED 
> to DEPLOYING.
> 76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING]
> 76130 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot 25dd5bd3007772fe2cc69568cad2d882.
> 76130 [TransientBlobCache shutdown hook] INFO  
> org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache
> 76130 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 76130 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 1 under allocation id 
> 25dd5bd3007772fe2cc69568cad2d882: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@389f2d5c
> 76130 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532), 
> deploy into slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
> 76130 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9), 
> deploy into slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
> 76131 [PermanentBlobCache shutdown hook] INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from CREATED 
> to DEPLOYING.
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING]
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 497dd733deeb255e05bae82f0e41527d at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 9132ba5f6b087654fb351138ce74e710 at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 639cf6da5847c7f4250839aeb2552df9 at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7b20c610
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252878a9
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@30eb707b
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from CREATED 
> to DEPLOYING.
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING]
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 7384c32213a4e9cd3aa6ee5875b1e532 at library cache manager took 0 
> milliseconds
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c1406b8
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76134 [FileChannelManagerImpl-io shutdown hook] INFO  
> org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - 
> FileChannelManager removed spill file directory 
> /var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-io-12fb17cf-712c-45f8-ab94-32fc7f5b5571
> 76136 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 76136 [IOManagerAsync shutdown hook] DEBUG 
> org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
> manager.
> 76136 [IOManagerAsync shutdown hook] DEBUG 
> org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
> manager.
> 76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739.
> 76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740.
> 76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740.
> 76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched 
> from DEPLOYING to RUNNING.
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(3/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(1/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(4/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(2/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9).
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (4/4)#8739 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710).
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d).
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (1/4)#8740 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (3/4)#8739 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (2/4)#8740 network resources (state: FAILED).
> 76137 [FileCache shutdown hook] INFO  
> org.apache.flink.runtime.filecache.FileCache [] - removed file cache 
> directory 
> /var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-dist-cache-13391a9e-181b-4d3c-b373-0ac0203f301e
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem 
> streams are closed for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [FAILED]
> 76138 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem 
> streams are closed for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [FAILED]
> 76138 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740 9132ba5f6b087654fb351138ce74e710.
> 76138 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740 7384c32213a4e9cd3aa6ee5875b1e532.
> 76138 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from RUNNING to FAILED on 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ localhost 
> (dataPort=-1).
> java.lang.RuntimeException: Test error. More instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>  ~[test-classes/:?]
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>  ~[test-classes/:?]
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
> ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>  ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760) 
> ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> ~[classes/:?]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> [...]
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to