[ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624834#comment-17624834 ]
shizhengchao commented on FLINK-29772: -------------------------------------- kafka interval join, deadlock occurs when using rocksdb > Kafka table source scan blocked > ------------------------------- > > Key: FLINK-29772 > URL: https://issues.apache.org/jira/browse/FLINK-29772 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.2 > Reporter: shizhengchao > Priority: Major > > {code:java} > // > "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10))))) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 > BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: > TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10))))) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > -blocked on java.lang.Object@4aa3fe44 > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown > Source) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)