shizhengchao created FLINK-25124:
------------------------------------
Summary: A deadlock occurs when the jdbc sink uses two consecutive
dimension tables to associate
Key: FLINK-25124
URL: https://issues.apache.org/jira/browse/FLINK-25124
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 1.13.1
Reporter: shizhengchao
The sql statement is as follows:
{code:java}
//代码占位符
INSERT INTO imei_phone_domestic_realtime
SELECT
t.data.imei AS imei,
CAST(t.data.register_date_key AS bigint) AS register_date_key,
c.agent_type AS channel_name,
c.agent_short_name,
c.agent_name,
c.agent_chinese_name,
c.isforeign AS agent_market_type,
p.seriename AS series_name,
p.salename AS sale_name,
p.devname AS dev_name,
p.devnamesource AS dev_name_source,
p.color,
p.isforeign AS product_market_type,
p.carrier,
p.lcname AS life_cycle,
IFNULL(p.shipping_price,0) AS shipping_price,
IFNULL(p.retail_price,0) AS retail_price
FROM kafka_imei_phone_domestic_realtime AS t
LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON
p.pn=t.item_code
LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON
c.customer_code=t.customer_code
where t.eventType='update'; {code}
There will be a probability of deadlock:
{code:java}
//代码占位符
"jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af
owned by "Legacy Source Thread - Source:
TableSourceScan(table=[[default_catalog, default_database,
kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) ->
Calc(select=[data, data.item_code AS $f3], where=[(eventType =
_UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) ->
LookupJoin(table=[default_catalog.default_database.dim_product],
joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3,
pn, color, isforeign, devname, salename, seriename, lcname, carrier,
devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color,
isforeign, devname, salename, seriename, lcname, carrier, devnamesource,
shipping_price, retail_price, data.customer_code AS $f31]) ->
LookupJoin(table=[default_catalog.default_database.dim_customer],
joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31],
select=[data, color, isforeign, devname, salename, seriename, lcname, carrier,
devnamesource, shipping_price, retail_price, $f31, customer_code,
agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) ->
Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS
register_date_key, agent_type AS channel_name, agent_short_name, agent_name,
agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name,
salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source,
color, isforeign AS product_market_type, carrier, lcname AS life_cycle,
IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price,
IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) ->
NotNullEnforcer(fields=[imei]) -> Sink:
Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime],
fields=[imei, register_date_key, channel_name, agent_short_name, agent_name,
agent_chinese_name, agent_market_type, series_name, sale_name, dev_name,
dev_name_source, color, product_market_type, carrier, life_cycle,
shipping_price, retail_price]) (6/12)#0" Id=82
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124)
- blocked on
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown
Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)