[ https://issues.apache.org/jira/browse/FLINK-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fei Feng updated FLINK-10735: ----------------------------- Description: flink on yarn with detached mode, when cancle flink job,yarn resource release very slow! if job failed and continouslly restart , it will get more and more container until the resource is used up。 Log: 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to CANCELLING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) (0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) (a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) (a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) (9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) (a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) (0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) (a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) (9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), sum(ss_net_profit) from store_sales where ss_coupon_amt > 0 group by tumble(rowtime, interval '1' hour) having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched from state CANCELLING to CANCELED. 18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint coordinator for job f0086980bd43a077dcfabd76e996f422. 18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: Shutting down 18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED. 18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), sum(ss_net_profit) from store_sales where ss_coupon_amt > 0 group by tumble(rowtime, interval '1' hour) having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422). 18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection 39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down.. 18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager 00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13 for job f0086980bd43a077dcfabd76e996f422 from the resource manager. 18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool. 18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool. 18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already shutdown. 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container container_1538963842459_0857_01_000017. 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN Node Manager to stop container org.apache.hadoop.yarn.exceptions.YarnException: Container container_1538963842459_0857_01_000017 is not handled by this NodeManager at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247) at org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307) at org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74) at org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872) at org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor connection container_1538963842459_0857_01_000017 because: TaskExecutor exceeded the idle timeout. 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from TaskExecutor container_1538963842459_0857_01_000017 at (akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did not recognize it was: flink on yarn with detached mode, when cancle flink job,yarn resource release very slow Log: 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to CANCELLING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) (0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) (a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) (a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) (9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING to CANCELING. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) (a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) (0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) (a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) (9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from CANCELING to CANCELED. 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), sum(ss_net_profit) from store_sales where ss_coupon_amt > 0 group by tumble(rowtime, interval '1' hour) having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched from state CANCELLING to CANCELED. 18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint coordinator for job f0086980bd43a077dcfabd76e996f422. 18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: Shutting down 18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED. 18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), sum(ss_net_profit) from store_sales where ss_coupon_amt > 0 group by tumble(rowtime, interval '1' hour) having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422). 18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection 39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down.. 18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager 00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13 for job f0086980bd43a077dcfabd76e996f422 from the resource manager. 18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool. 18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool. 18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already shutdown. 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container container_1538963842459_0857_01_000017. 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN Node Manager to stop container org.apache.hadoop.yarn.exceptions.YarnException: Container container_1538963842459_0857_01_000017 is not handled by this NodeManager at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247) at org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307) at org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74) at org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872) at org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor connection container_1538963842459_0857_01_000017 because: TaskExecutor exceeded the idle timeout. 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from TaskExecutor container_1538963842459_0857_01_000017 at (akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did not recognize it > flink on yarn close container exception > --------------------------------------- > > Key: FLINK-10735 > URL: https://issues.apache.org/jira/browse/FLINK-10735 > Project: Flink > Issue Type: Bug > Components: ResourceManager, TaskManager, YARN > Affects Versions: 1.6.2 > Environment: Hadoop 2.7 > flink 1.6.2 > Reporter: Fei Feng > Priority: Critical > > flink on yarn with detached mode, when cancle flink job,yarn resource release > very slow! > if job failed and continouslly restart , it will get more and more container > until the resource is used up。 > Log: > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job > 32F01A0FC50EFE8F4794AD0C45678EC4: xxx switched from state RUNNING to > CANCELLING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) > (0807b5f291f897ac4545dbfdb8ec3448) switched from RUNNING to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) > (a56a70eb6807dacf18fbf272ee6160e2) switched from RUNNING to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) > (a2eca3dc06087dfdaf3fd0200e545cc4) switched from RUNNING to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: > (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: > (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, > SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS > w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS > w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, > EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) > (9fa3592c74eda97124a033b6afea6c87) switched from RUNNING to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from RUNNING > to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from RUNNING > to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from RUNNING > to CANCELING. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (3/3) > (a2eca3dc06087dfdaf3fd0200e545cc4) switched from CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (1/3) (6d4f413ace1206714566b610e5bf47b6) switched from > CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (1/3) > (0807b5f291f897ac4545dbfdb8ec3448) switched from CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Source: > Kafka09TableSource(SS_SOLD_DATE_SK, SS_SOLD_TIME_SK, SS_ITEM_SK, > SS_CUSTOMER_SK, SS_CDEMO_SK, SS_HDEMO_SK, SS_ADDR_SK, SS_STORE_SK, > SS_PROMO_SK, SS_TICKET_NUMBER, SS_QUANTITY, SS_WHOLESALE_COST, SS_LIST_PRICE, > SS_SALES_PRICE, SS_EXT_DISCOUNT_AMT, SS_EXT_SALES_PRICE, > SS_EXT_WHOLESALE_COST, SS_EXT_LIST_PRICE, SS_EXT_TAX, SS_COUPON_AMT, > SS_NET_PAID, SS_NET_PAID_INC_TAX, SS_NET_PROFIT, ROWTIME) -> from: > (SS_WHOLESALE_COST, SS_SALES_PRICE, SS_COUPON_AMT, SS_NET_PAID, > SS_NET_PROFIT, ROWTIME) -> Timestamps/Watermarks -> where: (>(SS_COUPON_AMT, > 0)), select: (ROWTIME, SS_WHOLESALE_COST, SS_SALES_PRICE, SS_NET_PAID, > SS_NET_PROFIT) -> time attribute: (ROWTIME) (2/3) > (a56a70eb6807dacf18fbf272ee6160e2) switched from CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (2/3) (8871efc7c19abae7f833e02aa1d2107d) switched from > CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: window: > (TumblingGroupWindow('w$, 'ROWTIME, 3600000.millis)), select: > (SUM(SS_WHOLESALE_COST) AS EXPR$1, SUM(SS_SALES_PRICE) AS EXPR$2, > SUM(SS_NET_PAID) AS EXPR$3, SUM(SS_NET_PROFIT) AS EXPR$4, start('w$) AS > w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS > w$proctime) -> where: (>(EXPR$4, 1000)), select: (CAST(w$start) AS WSTART, > EXPR$1, EXPR$2, EXPR$3, EXPR$4) -> to: Row (1/1) > (9fa3592c74eda97124a033b6afea6c87) switched from CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Sink: > JDBCAppendTableSink(RS_DAY, RS_WHOLESALE_COST, RS_SALES_PRICE, RS_NET_PAID, > RS_NET_PROFIT) (3/3) (fcaf0b9247d60a31e3381d70871d929f) switched from > CANCELING to CANCELED. > 18/10/31 19:43:59 INFO executiongraph.ExecutionGraph: Job > 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 > select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, > sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), > sum(ss_net_profit) > from store_sales > where ss_coupon_amt > 0 > group by tumble(rowtime, interval '1' hour) > having sum(ss_net_profit) > 1000 (f0086980bd43a077dcfabd76e996f422) switched > from state CANCELLING to CANCELED. > 18/10/31 19:43:59 INFO checkpoint.CheckpointCoordinator: Stopping checkpoint > coordinator for job f0086980bd43a077dcfabd76e996f422. > 18/10/31 19:43:59 INFO checkpoint.StandaloneCompletedCheckpointStore: > Shutting down > 18/10/31 19:43:59 INFO dispatcher.StandaloneDispatcher: Job > f0086980bd43a077dcfabd76e996f422 reached globally terminal state CANCELED. > 18/10/31 19:43:59 INFO jobmaster.JobMaster: Stopping the JobMaster for job > 32F01A0FC50EFE8F4794AD0C45678EC4: insert into Result2 > select cast(tumble_start(rowtime, interval '1' hour) as varchar) as wstart, > sum(ss_wholesale_cost), sum(ss_sales_price), sum(ss_net_paid), > sum(ss_net_profit) > from store_sales > where ss_coupon_amt > 0 > group by tumble(rowtime, interval '1' hour) > having sum(ss_net_profit) > 1000(f0086980bd43a077dcfabd76e996f422). > 18/10/31 19:43:59 INFO jobmaster.JobMaster: Close ResourceManager connection > 39b6f62c8f9ef7681eb9f284d893ad0c: JobManager is shutting down.. > 18/10/31 19:43:59 INFO yarn.YarnResourceManager: Disconnect job manager > 00000000000000000000000000000...@akka.tcp://flink@node2:43683/user/jobmanager_13 > for job f0086980bd43a077dcfabd76e996f422 from the resource manager. > 18/10/31 19:44:00 INFO slotpool.SlotPool: Suspending SlotPool. > 18/10/31 19:44:00 INFO slotpool.SlotPool: Stopping SlotPool. > 18/10/31 19:44:00 INFO jobmaster.JobManagerRunner: JobManagerRunner already > shutdown. > 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Stopping container > container_1538963842459_0857_01_000017. > 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Error while calling YARN > Node Manager to stop container > org.apache.hadoop.yarn.exceptions.YarnException: Container > container_1538963842459_0857_01_000017 is not handled by this NodeManager > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) > at > org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297) > at > org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247) > at > org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:307) > at > org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:74) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:872) > at > org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1077) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:914) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 18/10/31 19:44:01 INFO yarn.YarnResourceManager: Closing TaskExecutor > connection container_1538963842459_0857_01_000017 because: TaskExecutor > exceeded the idle timeout. > 18/10/31 19:44:01 WARN yarn.YarnResourceManager: Discard registration from > TaskExecutor container_1538963842459_0857_01_000017 at > (akka.tcp://flink@node9:45391/user/taskmanager_0) because the framework did > not recognize it -- This message was sent by Atlassian JIRA (v7.6.3#76005)