[jira] [Updated] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-29772:
-
Description: 
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on 
java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-blocked on java.lang.Object@4aa3fe44
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 

[jira] [Updated] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-29772:
-
Description: 
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
{color:red}BLOCKED on java.lang.Object@4aa3fe44 owned by{color} "Legacy Source 
Thread - Source: TableSourceScan(table=[[ostream, user_mart, 
dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL 
SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, 
client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, 
sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, 
create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, 
code, msg, receiver, content_type, android_version, apk_version]) -> 
Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, 
code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS 
rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, 
_UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, 
Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), 
(_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT 
NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-{color:red} blocked on java.lang.Object@4aa3fe44{color}
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at