[ https://issues.apache.org/jira/browse/FLINK-29772 ]


    shizhengchao deleted comment on FLINK-29772:
    --------------------------------------

was (Author: tinny):
kafka interval join, deadlock occurs when using rocksdb

> Kafka table source scan blocked
> -------------------------------
>
>                 Key: FLINK-29772
>                 URL: https://issues.apache.org/jira/browse/FLINK-29772
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.2
>            Reporter: shizhengchao
>            Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10))))) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
> BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
> TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10))))) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     -blocked on java.lang.Object@4aa3fe44
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown
>  Source)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to