[jira] [Comment Edited] (FLINK-22416) UpsertKafkaTableITCase hangs when collecting results

2021-08-24 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404128#comment-17404128
 ] 

Qingsheng Ren edited comment on FLINK-22416 at 8/25/21, 2:36 AM:
-

This issue shares the same root cause with FLINK-22198 and should be fixed on 
master by 7814ee257b526d52f80a17143e228fb936b03ff5. Please reopen the ticket if 
it happens again.


was (Author: renqs):
This issue shares the same root cause with FLINK-22198 and should be fixed on 
master by 7814ee257b526d52f80a17143e228fb936b03ff5

> UpsertKafkaTableITCase hangs when collecting results
> 
>
> Key: FLINK-22416
> URL: https://issues.apache.org/jira/browse/FLINK-22416
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: stale-assigned, test-stability
> Fix For: 1.14.0
>
> Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6823672Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6824202Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6824709Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6825230Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6825716Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6826204Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6826807Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6827378Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6827926Z Apr 22 11:16:35  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6828331Z Apr 22 11:16:35  at 
> java.

[jira] [Comment Edited] (FLINK-22416) UpsertKafkaTableITCase hangs when collecting results

2021-06-30 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372111#comment-17372111
 ] 

Arvid Heise edited comment on FLINK-22416 at 6/30/21, 4:42 PM:
---

Looking at the stack traces: 
- all subtasks are waiting for data. 
- Legacy thread is waiting for data to arrive. 
- Kafka consumer thread is waiting for partition assignments. 

So it seems as if there is just no data after recovery... I guess we need to 
trace why {{hasAssignedPartitions == false}}.


{noformat}
"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (4/4)#0" #1434 daemon prio=5 os_prio=0 
tid=0x7fd8c0003000 nid=0x4681 waiting on condition [0x7fd8187c6000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xf8c34308> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)
{noformat}

It may be similar to FLINK-21996, but afaik this ticket was for new sources 
only and here we have a legacy source. [~sewen] would it be possible to have 
the same issue for old sources?


was (Author: arvid):
Looking at the stack traces: 
- all subtasks are waiting for data. 
- Legacy thread is waiting for data to arrive. 
- Kafka consumer thread is waiting for partition assignments. 

So it seems as if there is just no data after recovery... I guess we need to 
trace why {{hasAssignedPartitions == false}}.


{noformat}
"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (4/4)#0" #1434 daemon prio=5 os_prio=0 
tid=0x7fd8c0003000 nid=0x4681 waiting on condition [0x7fd8187c6000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xf8c34308> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)
{noformat}


> UpsertKafkaTableITCase hangs when collecting results
> 
>
> Key: FLINK-22416
> URL: https://issues.apache.org/jira/browse/FLINK-22416
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Qingsheng Ren
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.14.0
>
> Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResu

[jira] [Comment Edited] (FLINK-22416) UpsertKafkaTableITCase hangs when collecting results

2021-06-30 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372111#comment-17372111
 ] 

Arvid Heise edited comment on FLINK-22416 at 6/30/21, 4:40 PM:
---

Looking at the stack traces: 
- all subtasks are waiting for data. 
- Legacy thread is waiting for data to arrive. 
- Kafka consumer thread is waiting for partition assignments. 

So it seems as if there is just no data after recovery... I guess we need to 
trace why {{hasAssignedPartitions == false}}.


{noformat}
"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (4/4)#0" #1434 daemon prio=5 os_prio=0 
tid=0x7fd8c0003000 nid=0x4681 waiting on condition [0x7fd8187c6000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xf8c34308> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)
{noformat}



was (Author: arvid):
Looking at the stack traces: 
- all subtasks are waiting for data. 
- Legacy thread is waiting for data to arrive. 
- Kafka consumer thread is waiting for partition assignments. 

So it seems as if there is just no data after recovery... I guess we need to 
trace why {{hasAssignedPartitions == false}}.

> UpsertKafkaTableITCase hangs when collecting results
> 
>
> Key: FLINK-22416
> URL: https://issues.apache.org/jira/browse/FLINK-22416
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Qingsheng Ren
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.14.0
>
> Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

[jira] [Comment Edited] (FLINK-22416) UpsertKafkaTableITCase hangs when collecting results

2021-04-27 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17332997#comment-17332997
 ] 

Jark Wu edited comment on FLINK-22416 at 4/27/21, 7:19 AM:
---

Removing timeout is used helping locating problems. Let's observe it for a 
while.

Just a side note, if the problem is related to this: 
https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17326449
, we may need to allow failover restart in 
{{org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase#setup}}.


was (Author: jark):
Removing timeout is used helping locating problems. 

If the problem is related to this: 
https://issues.apache.org/jira/browse/FLINK-21996?focusedCommentId=17326449&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17326449
, we may need to allow failover restart in 
{{org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase#setup}}.

> UpsertKafkaTableITCase hangs when collecting results
> 
>
> Key: FLINK-22416
> URL: https://issues.apache.org/jira/browse/FLINK-22416
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.0
>
> Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6823672Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6824202Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6824709Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6825230Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6825716Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6826204Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6826807Z Apr 22 11:

[jira] [Comment Edited] (FLINK-22416) UpsertKafkaTableITCase hangs when collecting results

2021-04-22 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329266#comment-17329266
 ] 

Stephan Ewen edited comment on FLINK-22416 at 4/22/21, 4:44 PM:


At a first glance, I could not correlate this to any Operator Coordinator 
changes. They might be either legitimate timeouts in the test, or an issue with 
result fetching.

Everything switches properly to RUNNING and then the test times out.
{code}
11:15:25,353 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (2/4) 
(e76de9f0b5f935f2830579334640a391) switched from INITIALIZING to RUNNING.
11:15:25,355 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-64, groupId=null] Resetting offset for partition 
key_partial_value_topic_csv-0 to offset 4.
11:15:25,355 INFO  org.apache.flink.runtime.taskmanager.Task
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4)#0 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:25,357 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4) 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:54,449 [main] ERROR 
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase [] - 
Test testSourceSinkWithKeyAndPartialValue[format = 
csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase) 
failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
{code}

The only striking thing relating to the result fetcher is the line below, but 
it looks like this regularly happens on startup of the fetcher and is not 
indicative of an error.

{code}
11:15:25,296 INFO  
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - 
Invalid request. Received version = , offset = 0, while expected version = 
bde5cdc9-94e3-403c-8f10-827a65bd5c74, offset = 0
{code}

The second test failure looks like it is timing timing out while the job is 
waiting for slots. For some reason, the "after test cancel pending jobs" action 
doesn't seem to work properly after the first test timeout (maybe related to 
the execution with timeout?).

*Action:* I would suggest to remove the timeout rule from that test to make 
sure we have a thread dump on the next failure.


was (Author: stephanewen):
At a first glance, I could not correlate this to any Operator Coordinator 
changes. They might be either legitimate timeouts in the test, or an issue with 
result fetching.

Everything switches properly to RUNNING and then the test times out.
{code}
11:15:25,353 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (2/4) 
(e76de9f0b5f935f2830579334640a391) switched from INITIALIZING to RUNNING.
11:15:25,355 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-64, groupId=null] Resetting offset for partition 
key_partial_value_topic_csv-0 to offset 4.
11:15:25,355 INFO  org.apache.flink.runtime.taskmanager.Task
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4)#0 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:25,357 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4) 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:54,449 [main] ERROR 
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase [] - 
Test testSourceSinkWithKeyAndPartialValue[format = 
csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase) 
failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
{code}

I was this line here, which is the only thing relating