[
https://issues.apache.org/jira/browse/FLINK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415595#comment-17415595
]
Yun Gao edited comment on FLINK-24300 at 9/15/21, 4:27 PM:
-----------------------------------------------------------
>From the attached _jstack-2.txt_ (see this one since jstack.txt is running
>with alibaba jdk, which has some slight difference in line numbers), the stack
>of a single task is like
{code:java}
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1],
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk =
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk,
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:-
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n: :- [#2]
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales,
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk,
ws_ext_sales_price])\n: +- [#3] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk,
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1]
Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #217 prio=5 os_prio=0
tid=0x00007fcda518f000 nid=0x1632d runnable [0x00007fcc9e0ed000]
java.lang.Thread.State: RUNNABLE
at
java.util.concurrent.CompletableFuture.cleanStack(CompletableFuture.java:497)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:567)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1068)
at
java.util.concurrent.CompletableFuture$OrRelay.tryFire(CompletableFuture.java:1549)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.moveToAvailable(FutureCompletingBlockingQueue.java:173)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.enqueue(FutureCompletingBlockingQueue.java:347)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.put(FutureCompletingBlockingQueue.java:211)
--
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1],
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk =
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk,
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:-
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n: :- [#2]
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales,
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk,
ws_ext_sales_price])\n: +- [#3] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk,
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1]
Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #216 prio=5 os_prio=0
tid=0x00007fcda51be000 nid=0x16327 waiting on condition [0x00007fcc9e1ee000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000005067ffcf0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
at
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.getCachedEntry(AbstractOrcFileInputFormat.java:292)
at
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:256)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
--
"MultipleInput(readOrder=[0,1,1], members=[\nHashJoin(joinType=[InnerJoin],
where=[(d_date_sk = ws_sold_date_sk)], select=[ws_sold_date_sk,
ws_ext_sales_price, d_date_sk, d_week_seq, d_day_name], isBroadcast=[true],
build=[right])\n:- Union(all=[true], union=[ws_sold_date_sk,
ws_ext_sales_price])\n: :- [#2] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, web_sales, project=[ws_sold_date_sk,
ws_ext_sales_price]]], fields=[ws_sold_date_sk, ws_ext_sales_price])\n: +-
[#3] TableSourceScan(table=[[hive, tpcds_bin_orc_10000, catalog_sales,
project=[cs_sold_date_sk, cs_ext_sales_price]]], fields=[cs_sold_date_sk,
cs_ext_sales_price])\n+- [#1] Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #200 prio=5 os_prio=0
tid=0x00007fcda63ee800 nid=0x1624d waiting on condition [0x00007fcc9edfa000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004fd5fce68> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll(FutureCompletingBlockingQueue.java:257)
{code}
was (Author: gaoyunhaii):
>From the attached _jstack.txt_, the stack of a single task is like
{code:java}
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1],
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk =
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk,
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:-
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n: :- [#2]
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales,
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk,
ws_ext_sales_price])\n: +- [#3] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk,
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1]
Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #217 prio=5 os_prio=0
tid=0x00007fcda518f000 nid=0x1632d runnable [0x00007fcc9e0ed000]
java.lang.Thread.State: RUNNABLE
at
java.util.concurrent.CompletableFuture.cleanStack(CompletableFuture.java:497)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:567)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1068)
at
java.util.concurrent.CompletableFuture$OrRelay.tryFire(CompletableFuture.java:1549)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.moveToAvailable(FutureCompletingBlockingQueue.java:173)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.enqueue(FutureCompletingBlockingQueue.java:347)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.put(FutureCompletingBlockingQueue.java:211)
--
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1],
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk =
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk,
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:-
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n: :- [#2]
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales,
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk,
ws_ext_sales_price])\n: +- [#3] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk,
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1]
Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #216 prio=5 os_prio=0
tid=0x00007fcda51be000 nid=0x16327 waiting on condition [0x00007fcc9e1ee000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000005067ffcf0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
at
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.getCachedEntry(AbstractOrcFileInputFormat.java:292)
at
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:256)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
--
"MultipleInput(readOrder=[0,1,1], members=[\nHashJoin(joinType=[InnerJoin],
where=[(d_date_sk = ws_sold_date_sk)], select=[ws_sold_date_sk,
ws_ext_sales_price, d_date_sk, d_week_seq, d_day_name], isBroadcast=[true],
build=[right])\n:- Union(all=[true], union=[ws_sold_date_sk,
ws_ext_sales_price])\n: :- [#2] TableSourceScan(table=[[hive,
tpcds_bin_orc_10000, web_sales, project=[ws_sold_date_sk,
ws_ext_sales_price]]], fields=[ws_sold_date_sk, ws_ext_sales_price])\n: +-
[#3] TableSourceScan(table=[[hive, tpcds_bin_orc_10000, catalog_sales,
project=[cs_sold_date_sk, cs_ext_sales_price]]], fields=[cs_sold_date_sk,
cs_ext_sales_price])\n+- [#1] Exchange(distribution=[broadcast])\n]) [Source:
HiveSource-tpcds_bin_orc_10000.web_sales, Source:
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq,
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name =
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name =
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name =
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name =
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name =
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name =
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
ws_ext_sales_price, null:DOUBLE) AS $f7]) ->
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1)
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2,
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #200 prio=5 os_prio=0
tid=0x00007fcda63ee800 nid=0x1624d waiting on condition [0x00007fcc9edfa000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004fd5fce68> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll(FutureCompletingBlockingQueue.java:257)
{code}
> MultipleInputOperator is running much more slowly in TPCDS
> ----------------------------------------------------------
>
> Key: FLINK-24300
> URL: https://issues.apache.org/jira/browse/FLINK-24300
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.14.0, 1.15.0
> Reporter: Zhilong Hong
> Priority: Major
> Attachments: 64570e4c56955713ca599fd1d7ae7be891a314c6.png,
> detail-of-the-job.png, e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png,
> jstack-2.txt, jstack.txt
>
>
> When we are running TPCDS with release 1.14 we find that the job with
> MultipleInputOperator is running much more slowly than before. With a binary
> search among the commits, we find that the issue may be introduced by
> FLINK-23408.
> At the commit 64570e4c56955713ca599fd1d7ae7be891a314c6, the job runs normally
> in TPCDS, as the image below illustrates:
> !64570e4c56955713ca599fd1d7ae7be891a314c6.png|width=600!
> At the commit e3010c16947ed8da2ecb7d89a3aa08dacecc524a, the job q2.sql gets
> stuck for a pretty long time (longer than half an hour), as the image below
> illustrates:
> !e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png|width=600!
> The detail of the job is illustrated below:
> !detail-of-the-job.png|width=600!
> The job uses a {{MultipleInputOperator}} with one normal input and two
> chained FileSource. It has finished reading the normal input and start to
> read the chained source. Each chained source has one source data fetcher.
> We capture the jstack of the stuck tasks and attach the file below. From the
> [^jstack.txt] we can see the main thread is blocked on waiting for the lock,
> and the lock is held by a source data fetcher. The source data fetcher is
> still running but the stack keeps on {{CompletableFuture.cleanStack}}.
> This issue happens in a batch job. However, from where it get blocked, it
> seems also affects the streaming jobs.
> For the reference, the code of TPCDS we are running is located at
> [https://github.com/ververica/flink-sql-benchmark/tree/dev].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)