看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479

On Wed, 17 Jun 2020 at 11:00, hb <343122...@163.com> wrote:

> flink1.10.1 写的 SQL 作业, 开始运行3个小时正常,  checkpoint也正常.
> 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.
>
> TaskManager 日志:
> 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered
> group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null)
> 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5
> c29783b8f7ed8bfb1a7723f5c4216b1).
> 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2)
> (ed41475641eb3c58f7504d4a16a9c19b).
> 2020-06-16 23:27:46,034 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a).
> 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched from
> RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.shaded.netty4.io.netty.util.
> IllegalReferenceCountException: refCnt: 0
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .ensureAccessible(AbstractByteBuf.java:1464)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .checkReadableBytes0(AbstractByteBuf.java:1448)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .readLong(AbstractByteBuf.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
> 2020-06-16 23:27:46,045 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time],
> where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin
> (table=[JDBCTableSource(id, category_id_first, brand, category)],
> joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand IS
> NOT NULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) ->
> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id,
> brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369).
> 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey
> _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)])
> (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc).
> 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey
> _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)])
> (1/2) (32b2422a4619f4900c5a668dfb466ec9).
> 2020-06-16 23:27:46,037 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
> 2020-06-16 23:27:46,050 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched from
> RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8)
> exceeds writerIndex(0): Buffer 1 (freed)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .checkReadableBytes0(AbstractByteBuf.java:1451)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .readLong(AbstractByteBuf.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
> 2020-06-16 23:27:46,051 INFO org.apache.flink.runtime.taskmanager.Task -
> Triggering cancellation of task code Source: KafkaTableSource(id,
> order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id,
> item_name, item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
> 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) 
> (5c29783b8f7ed8bfb1a7723f5c4216b1)
> switched from RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
>
> 求指点啊.
>
>
>
>

Reply via email to