[ https://issues.apache.org/jira/browse/FLINK-21431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322760#comment-17322760 ]
Leonard Xu commented on FLINK-21431: ------------------------------------ [~maguowei] I'll create a PR for 1.12 branch > UpsertKafkaTableITCase.testTemporalJoin hang > -------------------------------------------- > > Key: FLINK-21431 > URL: https://issues.apache.org/jira/browse/FLINK-21431 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.12.2, 1.13.0 > Reporter: Guowei Ma > Assignee: Leonard Xu > Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > This case hangs almost 3 hours: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13543&view=logs&j=ce8f3cc3-c1ea-5281-f5eb-df9ebd24947f&t=f266c805-9429-58ed-2f9e-482e7b82f58b > {code:java} > Test testTemporalJoin[format = > csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase) > is running. > -------------------------------------------------------------------------------- > 23:08:43,259 [ main] INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl [] - > Creating topic users_csv 23:08:45,303 [ main] WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Property > [transaction.timeout.ms] not specified. Setting it to 3600000 ms 23:08:45,430 > [ChangelogNormalize(key=[user_id]) -> Calc(select=[user_id, user_name, > region, CAST(modification_time) AS timestamp]) -> Sink: > Sink(table=[default_catalog.default_database.users_csv], fields=[user_id, > user_name, region, timestamp]) (1/1)#0] WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Using > AT_LEAST_ONCE semantic, but checkpointing is not enabled. Switching to NONE > semantic. 23:08:45,438 [ChangelogNormalize(key=[user_id]) -> > Calc(select=[user_id, user_name, region, CAST(modification_time) AS > timestamp]) -> Sink: Sink(table=[default_catalog.default_database.users_csv], > fields=[user_id, user_name, region, timestamp]) (1/1)#0] INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting > FlinkKafkaInternalProducer (1/1) to produce into default topic users_csv > 23:08:45,791 [Source: TableSourceScan(table=[[default_catalog, > default_database, users_csv, watermark=[CAST($3):TIMESTAMP(3)]]], > fields=[user_id, user_name, region, timestamp]) (1/1)#0] INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - > Consumer subtask 0 has no restore state. 23:08:45,810 [Source: > TableSourceScan(table=[[default_catalog, default_database, users_csv, > watermark=[CAST($3):TIMESTAMP(3)]]], fields=[user_id, user_name, region, > timestamp]) (1/1)#0] INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - > Consumer subtask 0 will start reading the following 2 partitions from the > earliest offsets: [KafkaTopicPartition{topic='users_csv', partition=1}, > KafkaTopicPartition{topic='users_csv', partition=0}] 23:08:45,825 [Legacy > Source Thread - Source: TableSourceScan(table=[[default_catalog, > default_database, users_csv, watermark=[CAST($3):TIMESTAMP(3)]]], > fields=[user_id, user_name, region, timestamp]) (1/1)#0] INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - > Consumer subtask 0 creating fetcher with offsets > {KafkaTopicPartition{topic='users_csv', partition=1}=-915623761775, > KafkaTopicPartition{topic='users_csv', partition=0}=-915623761775}. > ##[error]The operation was canceled. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)