[ 
https://issues.apache.org/jira/browse/FLINK-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fei Feng updated FLINK-10735:
-----------------------------
    Description: 
flink on yarn with detached mode, when cancle flink job,yarn resource release 
very slow!

if job failed and continouslly restart , it will get more and more container 
until the resource is  used up。

Log:

18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to CANCELLING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
(0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
(a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
(a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
(TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
(SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
(9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING 
to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING 
to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING 
to CANCELING.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
(a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from CANCELING 
to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
(0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
(a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from CANCELING 
to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
(TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
(SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
(9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from CANCELING 
to CANCELED.
 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
 sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
sum(ss_net_profit)
 from store_sales
 where ss_coupon_amt > 0
 group by tumble(rowtime, interval '1' hour)
 having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched 
from state CANCELLING to CANCELED.
 18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint 
coordinator for job f0086980bd43a077dcfabd76e996f422.
 18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: Shutting 
down
 18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job 
f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED.
 18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job 
32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
 sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
sum(ss_net_profit)
 from store_sales
 where ss_coupon_amt > 0
 group by tumble(rowtime, interval '1' hour)
 having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422).
 18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection 
39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down..
 18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager 
00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13
 for job f0086980bd43a077dcfabd76e996f422 from the resource manager.
 18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool.
 18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool.
 18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already 
shutdown.
 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container 
container_1538963842459_0857_01_000017.
 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN Node 
Manager to stop container
 org.apache.hadoop.yarn.exceptions.YarnException: Container 
container_1538963842459_0857_01_000017 is not handled by this NodeManager
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
 at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152)
 at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
 at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297)
 at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247)
 at 
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307)
 at 
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74)
 at 
org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872)
 at 
org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077)
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914)
 at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
 at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor 
connection container_1538963842459_0857_01_000017 because: TaskExecutor 
exceeded the idle timeout.
 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from 
TaskExecutor container_1538963842459_0857_01_000017 at 
(akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did not 
recognize it

  was:
flink on yarn with detached mode, when cancle flink job,yarn resource release 
very slow

Log:

18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to CANCELLING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
(0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
(a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
(a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
(TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
(SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
(9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING 
to CANCELING.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
(a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from CANCELING 
to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
(0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, 
SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, 
SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, 
SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, 
SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> 
where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, 
SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
(a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from CANCELING 
to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
(TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
(SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
(9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from CANCELING 
to CANCELED.
18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
        sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
sum(ss_net_profit)
from store_sales
where ss_coupon_amt > 0
group by tumble(rowtime, interval '1' hour)
having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched 
from state CANCELLING to CANCELED.
18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint 
coordinator for job f0086980bd43a077dcfabd76e996f422.
18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: Shutting 
down
18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job 
f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED.
18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job 
32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
        sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
sum(ss_net_profit)
from store_sales
where ss_coupon_amt > 0
group by tumble(rowtime, interval '1' hour)
having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422).
18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection 
39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down..
18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager 
00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13
 for job f0086980bd43a077dcfabd76e996f422 from the resource manager.
18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool.
18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool.
18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already 
shutdown.
18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container 
container_1538963842459_0857_01_000017.
18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN Node 
Manager to stop container
org.apache.hadoop.yarn.exceptions.YarnException: Container 
container_1538963842459_0857_01_000017 is not handled by this NodeManager
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152)
        at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
        at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297)
        at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247)
        at 
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307)
        at 
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor 
connection container_1538963842459_0857_01_000017 because: TaskExecutor 
exceeded the idle timeout.
18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from 
TaskExecutor container_1538963842459_0857_01_000017 at 
(akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did not 
recognize it


> flink on yarn close container exception
> ---------------------------------------
>
>                 Key: FLINK-10735
>                 URL: https://issues.apache.org/jira/browse/FLINK-10735
>             Project: Flink
>          Issue Type: Bug
>          Components: ResourceManager, TaskManager, YARN
>    Affects Versions: 1.6.2
>         Environment: Hadoop 2.7
> flink 1.6.2
>            Reporter: Fei Feng
>            Priority: Critical
>
> flink on yarn with detached mode, when cancle flink job,yarn resource release 
> very slow!
> if job failed and continouslly restart , it will get more and more container 
> until the resource is  used up。
> Log:
> 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
> 32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to 
> CANCELLING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
> (0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
> (a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
> (a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
> (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
> (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
> SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
> w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
> EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
> (9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING 
> to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING 
> to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING 
> to CANCELING.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) 
> (a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from 
> CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) 
> (0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: 
> Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, 
> SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, 
> SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, 
> SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, 
> SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, 
> SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: 
> (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, 
> SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 
> 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, 
> SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) 
> (a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from 
> CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: 
> (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: 
> (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, 
> SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS 
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
> w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, 
> EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) 
> (9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: 
> JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, 
> RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from 
> CANCELING to CANCELED.
>  18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 
> 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
>  select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
>  sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
> sum(ss_net_profit)
>  from store_sales
>  where ss_coupon_amt > 0
>  group by tumble(rowtime, interval '1' hour)
>  having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched 
> from state CANCELLING to CANCELED.
>  18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint 
> coordinator for job f0086980bd43a077dcfabd76e996f422.
>  18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: 
> Shutting down
>  18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job 
> f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED.
>  18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job 
> 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2
>  select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart,
>  sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), 
> sum(ss_net_profit)
>  from store_sales
>  where ss_coupon_amt > 0
>  group by tumble(rowtime, interval '1' hour)
>  having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422).
>  18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection 
> 39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down..
>  18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager 
> 00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13
>  for job f0086980bd43a077dcfabd76e996f422 from the resource manager.
>  18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool.
>  18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool.
>  18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already 
> shutdown.
>  18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container 
> container_1538963842459_0857_01_000017.
>  18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN 
> Node Manager to stop container
>  org.apache.hadoop.yarn.exceptions.YarnException: Container 
> container_1538963842459_0857_01_000017 is not handled by this NodeManager
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>  at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152)
>  at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>  at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297)
>  at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247)
>  at 
> org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307)
>  at 
> org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74)
>  at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872)
>  at 
> org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077)
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914)
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>  at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor 
> connection container_1538963842459_0857_01_000017 because: TaskExecutor 
> exceeded the idle timeout.
>  18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from 
> TaskExecutor container_1538963842459_0857_01_000017 at 
> (akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did 
> not recognize it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to