[jira] [Created] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent
Jun Qin created FLINK-30438: --- Summary: The generated schema is not correct when using value.format debezium-avro-confluent Key: FLINK-30438 URL: https://issues.apache.org/jira/browse/FLINK-30438 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Jun Qin With the following code: {code:java} CREATE TABLE TEST( ID BIGINT, INTEGRATION_ID STRING, PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED ) WITH( 'connector' = 'kafka', 'topic' = 'TEST', 'properties.bootstrap.servers' = 'broker:29092', 'properties.group.id' = 'TEST', 'key.format' = 'avro-confluent', 'key.fields' = 'INTEGRATION_ID', 'key.avro-confluent.url' = 'http://schema-registry:8081', 'value.format' = 'debezium-avro-confluent', 'value.debezium-avro-confluent.url' = 'http://schema-registry:8081', 'scan.startup.mode' = 'earliest-offset' ); {code} and this INSERT statement: {code:java} INSERT INTO TEST SELECT 1, '1'; {code} The schema we get in schema registry is: {code:java} [ "null", { "fields": [ { "default": null, "name": "before", "type": [ "null", { "fields": [ { "default": null, "name": "ID", "type": [ "null", "long" ] }, { "name": "INTEGRATION_ID", "type": "string" } ], "name": "record_before", "type": "record" } ] }, { "default": null, "name": "after", "type": [ "null", { "fields": [ { "default": null, "name": "ID", "type": [ "null", "long" ] }, { "name": "INTEGRATION_ID", "type": "string" } ], "name": "record_after", "type": "record" } ] }, { "default": null, "name": "op", "type": [ "null", "string" ] } ], "name": "record", "namespace": "org.apache.flink.avro.generated", "type": "record" } ] {code} The first 'null' in the schema does not look to be correct. Can you check and fix? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30220) Hide user credentials in Flink SQL JDBC connector
Jun Qin created FLINK-30220: --- Summary: Hide user credentials in Flink SQL JDBC connector Key: FLINK-30220 URL: https://issues.apache.org/jira/browse/FLINK-30220 Project: Flink Issue Type: Improvement Reporter: Jun Qin Similar to FLINK-28028, when using Flink SQL JDBC connector, we should also have a way to secure the username and the password used in the DDL: {code:java} CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users', 'username' = 'a-username', 'password' = 'a-password' ); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28834) Add TemporalJoin example and ITCase
Jun Qin created FLINK-28834: --- Summary: Add TemporalJoin example and ITCase Key: FLINK-28834 URL: https://issues.apache.org/jira/browse/FLINK-28834 Project: Flink Issue Type: Improvement Reporter: Jun Qin Assignee: Jun Qin A temporal join example is useful to show users how to use temporary join and how it works. The corresponding ITCase also helps to verify the temporal join functionality in Flink SQL and to show users how to do Flink SQL testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28650) Flink SQL Parsing bug for METADATA
Jun Qin created FLINK-28650: --- Summary: Flink SQL Parsing bug for METADATA Key: FLINK-28650 URL: https://issues.apache.org/jira/browse/FLINK-28650 Project: Flink Issue Type: Bug Affects Versions: 1.14.4 Reporter: Jun Qin With the following source/sink tables: {code:sql} CREATE TABLE sourceTable ( `key` INT, `time` TIMESTAMP(3), `value` STRING NOT NULL, id INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100' ); CREATE TABLE sinkTable1 ( `time` TIMESTAMP(3) METADATA FROM 'timestamp', `value` STRING NOT NULL ) WITH ( 'connector' = 'kafka', ... ) CREATE TABLE sinkTable2 ( `time` TIMESTAMP(3),-- without METADATA `value` STRING NOT NULL ) WITH ( 'connector' = 'kafka', ... ) {code} the following three pass the validation: {code:sql} INSERT INTO sinkTable1 SELECT `time`, `value` FROM sourceTable; INSERT INTO sinkTable2 SELECT `time`, `value` FROM sourceTable; INSERT INTO sinkTable2 (`time`,`value`) SELECT `time`, `value` FROM sourceTable; {code} but this one does not: {code:sql} INSERT INTO sinkTable1 (`time`,`value`) SELECT `time`, `value` FROM sourceTable; {code} It failed with {code:java} Unknown target column 'time' {code} It seems when providing column names in INSERT, the METADATA have an undesired effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27473) Capture time that a job spends on initializing tasks
Jun Qin created FLINK-27473: --- Summary: Capture time that a job spends on initializing tasks Key: FLINK-27473 URL: https://issues.apache.org/jira/browse/FLINK-27473 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Metrics Reporter: Jun Qin Similar to https://issues.apache.org/jira/browse/FLINK-25888, we should have it also for initializing tasks. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27173) CoGroupedStreams$TaggedUnion cannot be used as a POJO type
Jun Qin created FLINK-27173: --- Summary: CoGroupedStreams$TaggedUnion cannot be used as a POJO type Key: FLINK-27173 URL: https://issues.apache.org/jira/browse/FLINK-27173 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.14.4 Reporter: Jun Qin Attachments: StreamingJob.java Attached is the code to demo the issue. When running the job, we can see the following: {code:java} 11:48:26,584 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.datastream.CoGroupedStreams$TaggedUnion does not contain a setter for field one 11:48:26,586 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.datastream.CoGroupedStreams$TaggedUnion cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. {code} TaggedUnion is a class in Flink. This should be fixed in Flink. [^StreamingJob.java] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26018) Late events in the new KafkaSource
Jun Qin created FLINK-26018: --- Summary: Late events in the new KafkaSource Key: FLINK-26018 URL: https://issues.apache.org/jira/browse/FLINK-26018 Project: Flink Issue Type: Bug Reporter: Jun Qin Attachments: message in kafka.txt, taskmanager_10.28.0.131_33249-b3370c_log There is an issue with the new KafkaSource connector in Flink 1.14: when one task consumes messages from multiple topic partitions (statically created, timestamp are in order), it may start with one partition and advances watermarks before the data from other partitions come. In this case, the early messages in other partitions may unnecessarily be considered as late ones. I discussed with [~renqs], it seems that the new KafkaSource only adds a partition into {{WatermarkMultiplexer}} when it receives data from that partition. In contrast, FlinkKafkaConsumer adds all known partition before it fetch any data. Attached two files: the messages in Kafka and the corresponding TM logs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25160) Make doc clear: tolerable-failed-checkpoints counts consecutive failures
Jun Qin created FLINK-25160: --- Summary: Make doc clear: tolerable-failed-checkpoints counts consecutive failures Key: FLINK-25160 URL: https://issues.apache.org/jira/browse/FLINK-25160 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Jun Qin According to the code, tolerable-failed-checkpoints counts the consecutive failures. We should make this clear in the doc [config|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24686) Make doc clear on AsyncFunction::timeout() overriding
Jun Qin created FLINK-24686: --- Summary: Make doc clear on AsyncFunction::timeout() overriding Key: FLINK-24686 URL: https://issues.apache.org/jira/browse/FLINK-24686 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Jun Qin Assignee: Jun Qin Sometimes, a user overrides {{AsyncFunction::timeout()}} with an empty method or with only logging code. This causes the timeout does not signal back to the framework and job stuck especially when using {{orderedWait()}}. Opening this Jira to make the doc clear on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24543) Zookeeper connection issue causes inconsistent state in Flink
Jun Qin created FLINK-24543: --- Summary: Zookeeper connection issue causes inconsistent state in Flink Key: FLINK-24543 URL: https://issues.apache.org/jira/browse/FLINK-24543 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.13.2 Reporter: Jun Qin Env: Flink 1.13.2 with Zookeeper HA Here is what happened: {code:bash} # checkpoint 1116 was triggered 2021-10-09 00:16:49,555 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job a8a4fb85b681a897ba118db64333c9e5. # a few seconds later, zookeeper connection suspended, it turned out to be a disk issue at zookeeper side caused slow fsync and commit) 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Connection to ZooKeeper suspended. The contender LeaderContender: DefaultDispatcherRunner no longer participates in the leader election. # job was switching to suspended 2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3 for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager. 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3 for job a8a4fb85b681a897ba118db64333c9e5. 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5). 2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink ... (a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to SUSPENDED. 2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}. 2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED. # zookeeper connector restored 2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Connection to ZooKeeper was reconnected. Leader election can be restarted. # received checkpoint acknowledgement, trying to finalize it, then failed to add to zookeeper due to KeeperException$NodeExistsException 2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ... (1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to CANCELING. 2021-10-09 00:17:14,382 [jobmanager-future-thread-1] WARN org.apache.flink.runtime.jobmaster.JobMaster [] - Error while processing AcknowledgeCheckpoint message org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete the pending checkpoint 1116. Failure reason: Failure to finalize checkpoint. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at
[jira] [Created] (FLINK-24310) A bug in the BufferingSink example in the doc
Jun Qin created FLINK-24310: --- Summary: A bug in the BufferingSink example in the doc Key: FLINK-24310 URL: https://issues.apache.org/jira/browse/FLINK-24310 Project: Flink Issue Type: Bug Components: Documentation Reporter: Jun Qin The following line in the BufferingSink on [this page|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#operator-state] has a bug: if (bufferedElements.size() == threshold) { It should be {{>=}} instead of {{==}} , because when restoring from a checkpoint during downscaling, the task may get more elements than the threshold. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24238) Page title missing
Jun Qin created FLINK-24238: --- Summary: Page title missing Key: FLINK-24238 URL: https://issues.apache.org/jira/browse/FLINK-24238 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.13.2 Reporter: Jun Qin the page title is missing on this Flink doc: [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/]. [This one|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/] is a good example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23429) State Processor API failed with FileNotFoundException when working with state files on Cloud Storage
Jun Qin created FLINK-23429: --- Summary: State Processor API failed with FileNotFoundException when working with state files on Cloud Storage Key: FLINK-23429 URL: https://issues.apache.org/jira/browse/FLINK-23429 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.12.4, 1.13.1 Reporter: Jun Qin For example, {code:java} Caused by: java.io.FileNotFoundException: /savepoints/savepoint-18cf55-d90c1b6b1d12/c965e4fd-9647-4f25-b4cd-5ce0485759fd (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:?] at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?] at java.io.FileInputStream.(FileInputStream.java:157) ~[?:?] at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[?:?] at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[?:?] at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?] {code} However, the actual files to be copied do exist in the source savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23411) Expose Flink checkpoint details metrics
Jun Qin created FLINK-23411: --- Summary: Expose Flink checkpoint details metrics Key: FLINK-23411 URL: https://issues.apache.org/jira/browse/FLINK-23411 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Affects Versions: 1.12.4, 1.13.1 Reporter: Jun Qin The checkpoint metrics as shown in the Flink Web UI like the sync/async/alignment/start delay are not exposed to the metrics system. This makes problem investigation harder when Web UI is not enabled: those numbers can not get in the DEBUG logs. I think we should see how we can expose metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23410) Use a pool of KafkaProducers to commit Kafka Transactions
Jun Qin created FLINK-23410: --- Summary: Use a pool of KafkaProducers to commit Kafka Transactions Key: FLINK-23410 URL: https://issues.apache.org/jira/browse/FLINK-23410 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.12.4, 1.13.1 Reporter: Jun Qin Currently, {{FlinkKafkaProducer}} contains {{kafkaProducersPoolSize}} (it is 5 by default). But {{kafkaProducersPoolSize}} is only used to calculate next transactionalIds. There is actually no KafkaProducer pool in {{FlinkKafkaProducer}}. This means, for every checkpoint, Flink creates a new KakfaProducer (therefore a new thread) and get a new producer id from Kafka before it can initialize/commit a transaction. When the checkpoint is complete and transaction is committed, the thread is shutdown. This is inefficient not only in terms of Flink's CPU usage (to shutdown/recreate threads) but also in terms of the network communication to Kafka (to re-request the producer Id from Kafka). This JIRA is opened to actually implement the KafkaProducer pool. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23201) The check on alignmentDurationNanos seems to be too strict
Jun Qin created FLINK-23201: --- Summary: The check on alignmentDurationNanos seems to be too strict Key: FLINK-23201 URL: https://issues.apache.org/jira/browse/FLINK-23201 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.12.2 Reporter: Jun Qin The check on alignmentDurationNanos seems to be too strict at the line: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java#L74 This may cause a job fail when doing stop-with-savepoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22647) The time unit of state access latency on the Flink UI is incorrect
Jun Qin created FLINK-22647: --- Summary: The time unit of state access latency on the Flink UI is incorrect Key: FLINK-22647 URL: https://issues.apache.org/jira/browse/FLINK-22647 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.13.0 Reporter: Jun Qin Attachments: Screen Shot 2021-05-12 at 11.34.11 AM.png See the attached screenshot, the number is actually 7000 nanoseconds, but the UI shows 7s. I suggest removing the unit on the UI, for now, to avoid confusion until there is a way to forward the unit to UI in Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22642) UI issue with flame graph
Jun Qin created FLINK-22642: --- Summary: UI issue with flame graph Key: FLINK-22642 URL: https://issues.apache.org/jira/browse/FLINK-22642 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.13.0 Reporter: Jun Qin Attachments: Screen Shot 2021-05-11 at 10.47.21 PM.png, Screen Shot 2021-05-11 at 10.50.29 PM.png There is a minor issue with the flame graph on the Flink 1.13 UI: *often* I somehow managed to “pin” those tooltip popups (i.e., the black boxes). They stay there even if I switch to another tab. See the attached screenshot. But I do not how I did it…I tried to do it by intention, I could not get the same. Refresh browser tab can get rid of them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21336) Activate bloom filter in RocksDB State Backend via Flink configuration
Jun Qin created FLINK-21336: --- Summary: Activate bloom filter in RocksDB State Backend via Flink configuration Key: FLINK-21336 URL: https://issues.apache.org/jira/browse/FLINK-21336 Project: Flink Issue Type: Improvement Reporter: Jun Qin Assignee: Jun Qin Activating bloom filter in the RocksDB state backend improves read performance. Currently activating bloom filter can only be done by implementing a custom ConfigurableRocksDBOptionsFactory. I think we should provide an option to activate bloom filter via Flink configuration. See also the discussion in ML: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Activate-bloom-filter-in-RocksDB-State-Backend-via-Flink-configuration-td48636.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21315) Support to set operator names in state processor API
Jun Qin created FLINK-21315: --- Summary: Support to set operator names in state processor API Key: FLINK-21315 URL: https://issues.apache.org/jira/browse/FLINK-21315 Project: Flink Issue Type: Improvement Reporter: Jun Qin Currently, it is not possible to set a user-friendly operator name when using state processor API. For example, when you use `readKeyedState()`, the operator name shows on the Flink UI is: {{DataSource (at readKeyedState(ExistingSavepoint.java:282) (org.apache.flink.state.api.input.KeyedStateInputFormat))}} The same long name is shown on Grafana when Flink metrics are displayed. This Jira aims to provide users an option to set operator names in state processor API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19008) Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb)
Jun Qin created FLINK-19008: --- Summary: Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb) Key: FLINK-19008 URL: https://issues.apache.org/jira/browse/FLINK-19008 Project: Flink Issue Type: Improvement Reporter: Jun Qin A customer runs a Flink job with RocksDB state backend. Checkpoints are retained and done incrementally. The state size is several TB. When they restore + downscale from a retained checkpoint, although the downloading of checkpoint files took ~20min, the job throughput returns to the expected level only after 3 hours. I do not have RocksDB logs. The suspicion for those 3 hours is due to heavy RocksDB compaction and/or flush. As it was observed that checkpoint could not finish faster enough due to long {{checkpoint duration (sync)}}. How can we make this restoring phase shorter? For compaction, I think it is worth to check the improvement of: {code:java} CompactionPri compaction_pri = kMinOverlappingRatio;{code} which has been set to default in RocksDB 6.x: {code:java} // In Level-based compaction, it Determines which file from a level to be // picked to merge to the next level. We suggest people try // kMinOverlappingRatio first when you tune your database. enum CompactionPri : char { // Slightly prioritize larger files by size compensated by #deletes kByCompensatedSize = 0x0, // First compact files whose data's latest update time is oldest. // Try this if you only update some hot keys in small ranges. kOldestLargestSeqFirst = 0x1, // First compact files whose range hasn't been compacted to the next level // for the longest. If your updates are random across the key space, // write amplification is slightly better with this option. kOldestSmallestSeqFirst = 0x2, // First compact files whose ratio between overlapping size in next level // and its size is the smallest. It in many cases can optimize write // amplification. kMinOverlappingRatio = 0x3, }; ... // Default: kMinOverlappingRatio CompactionPri compaction_pri = kMinOverlappingRatio;{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18998) No watermark is shown on Flink UI when ProcessingTime is used
Jun Qin created FLINK-18998: --- Summary: No watermark is shown on Flink UI when ProcessingTime is used Key: FLINK-18998 URL: https://issues.apache.org/jira/browse/FLINK-18998 Project: Flink Issue Type: Bug Reporter: Jun Qin Attachments: screenshot_2020-08-18_at_10.57.39.png As stated in the subject, no watermark is shown on Flink UI when ProcessingTime is used, see the attached screenshot: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18702) Flink elasticsearch connector leaks threads and classloaders thereof
Jun Qin created FLINK-18702: --- Summary: Flink elasticsearch connector leaks threads and classloaders thereof Key: FLINK-18702 URL: https://issues.apache.org/jira/browse/FLINK-18702 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.10.1 Reporter: Jun Qin Flink elasticsearch connector leaking threads and classloaders thereof. This results in OOM Metaspace when ES sink fails and restarted many times. This issue is visible in Flink 1.10 but not in 1.11 because Flink 1.11 does not create new class loaders in case of recoveries ([FLINK-16408|https://issues.apache.org/jira/browse/FLINK-16408]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
Jun Qin created FLINK-17327: --- Summary: Kafka unavailability could cause Flink TM shutdown Key: FLINK-17327 URL: https://issues.apache.org/jira/browse/FLINK-17327 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: Jun Qin Steps to reproduce: # Start a Flink 1.10 standalone cluster # Run a Flink job which reads from one Kafka topic and writes to another topic, with exactly-once checkpointing enabled # Stop all Kafka Brokers after a few successful checkpoints When Kafka brokers are down: # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker could not be established # Then, Flink could not complete snapshot due to {{Timeout expired while initializing transactional state in 6ms}} # After several snapshot failures, Flink reported {{Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.}} # Eventually, Flink tried to cancel the task which did not succeed within 3 min # Then {{Fatal error occurred while executing the TaskManager. Shutting it down...}} I will attach the logs to show the details. Worth to note that if there would be no consumer but producer only in the task, the behavior is different: # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker could not be established # after {{delivery.timeout.ms}} (2min by default), producer reports: {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for output-topic-0:120001 ms has passed since batch creation}} # Flink tried to cancel the upstream tasks and created a new producer # The new producer obviously reported connectivity issue to brokers # This continues till Kafka brokers are back. # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.}} # Flink cancelled the tasks and restarted them # The job continues, and new checkpoint succeeded. # TM runs all the time in this scenario I set Kafka transaction time out to 1 hour just to avoid transaction timeout. To get a producer only task, I called {{env.disableOperatorChaining();}} in the second scenario. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17288) Speedup loading from savepoints into RocksDB by bulk load
Jun Qin created FLINK-17288: --- Summary: Speedup loading from savepoints into RocksDB by bulk load Key: FLINK-17288 URL: https://issues.apache.org/jira/browse/FLINK-17288 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Jun Qin When resource is a constraint, loading a big savepoint into RocksDB may take some time. This may also impact the job recovery time when the savepoint was used for recovery. Bulk load from savepoint should help in this regard. Here is an excerpt from the RocksDB FAQ: {quote}*Q: What's the fastest way to load data into RocksDB?* A: A fast way to direct insert data to the DB: # using single writer thread and insert in sorted order # batch hundreds of keys into one write batch # use vector memtable # make sure options.max_background_flushes is at least 4 # before inserting the data, disable automatic compaction, set options.level0_file_num_compaction_trigger, options.level0_slowdown_writes_trigger and options.level0_stop_writes_trigger to very large. After inserting all the data, issue a manual compaction. 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option If you can pre-process the data offline before inserting. There is a faster way: you can sort the data, generate SST files with non-overlapping ranges in parallel and bulkload the SST files. See [https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files] {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
Jun Qin created FLINK-16419: --- Summary: Avoid to recommit transactions which are known committed successfully to Kafka upon recovery Key: FLINK-16419 URL: https://issues.apache.org/jira/browse/FLINK-16419 Project: Flink Issue Type: Improvement Reporter: Jun Qin When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer tries to recommit all pre-committed transactions which are in the snapshot, even if those transactions were successfully committed before (i.e., the call to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} returns OK). This may lead to recovery failures when recovering from a very old snapshot because the transactional IDs in that snapshot have been expired and removed from Kafka. For example the following scenario: # Start a Flink job with FlinkKafkaProducer sink with exactly-once # Suspend the Flink job with a savepoint A # Wait for time longer than {{transactional.id.expiration.ms}} + {{transaction.remove.expired.transaction.cleanup.interval.ms}} # Recover the job with savepoint A. # The recovery will fail with the following error: {noformat} 2020-02-26 14:33:25,817 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer - Attempting to resume transaction Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1, transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka producer with timeoutMillis = 92233720 36854775807 ms. 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED. org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) at java.lang.Thread.run(Thread.java:748) {noformat} After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible way is to let JobManager, after successfully notifies all operators the completion of a snapshot (via {{notifyCheckpoingComplete}}), record the success, e.g., write the successful transactional IDs somewhere in the snapshot. Then those transactions need not recommit upon recovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15865) When to add .uid() call: inconsistent definition of operators in Flink docs
Jun Qin created FLINK-15865: --- Summary: When to add .uid() call: inconsistent definition of operators in Flink docs Key: FLINK-15865 URL: https://issues.apache.org/jira/browse/FLINK-15865 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.1 Reporter: Jun Qin On one hand, the Flink doc suggests to add .uid() call for *all* operators in [1], on the other hand, it lists all operators in Flink [2]. The issues are: # KeyBy is listed as an operator, but .keyBy().uid() is not a valid call. This is same for window(), split(), etc. # addSource(), addSink() are not listed as operators, but we do expect user to call .uid() after addSource() and addSink(), especially in the exact-once scenario. This creates confusions, esp., for beginners. There should be a better definition about which/what kind of operators can have a following uid() call. [1] [Should I assign ids to all operators in my job|https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job] [2] [Flink Operators|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15298) Wrong dependences in the DataStream API tutorial (the wiki-edits example)
Jun Qin created FLINK-15298: --- Summary: Wrong dependences in the DataStream API tutorial (the wiki-edits example) Key: FLINK-15298 URL: https://issues.apache.org/jira/browse/FLINK-15298 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.1, 1.9.0 Reporter: Jun Qin [The DataStream API Tutorial in Flink 1.9 | https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/datastream_api.html] mentioned the following dependences: {code:java} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} org.apache.flink flink-clients_2.11 ${flink.version} org.apache.flink flink-connector-wikiedits_2.11 ${flink.version} {code} There are two issues here: # {{flink-java}} and {{flink-streaming-java}} should be set to *provided* scope # {{flink-client}} is not needed. If {{flink-client}} is added into *compile* scope, {{flink-runtime}} will be added implicitly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14942) State Processing API: add an option to make deep copy
Jun Qin created FLINK-14942: --- Summary: State Processing API: add an option to make deep copy Key: FLINK-14942 URL: https://issues.apache.org/jira/browse/FLINK-14942 Project: Flink Issue Type: Improvement Components: API / State Processor Affects Versions: 1.9.1 Reporter: Jun Qin Current when a new savepoint is created based on a source savepoint, then there are references in the new savepoint to the source savepoint. Here is the [State Processing API doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] says: bq. Note: When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other! This JIRA is to request an option to have a deep copy (instead of shallow copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14890) TestHarness for KeyedBroadcastProcessFunction
Jun Qin created FLINK-14890: --- Summary: TestHarness for KeyedBroadcastProcessFunction Key: FLINK-14890 URL: https://issues.apache.org/jira/browse/FLINK-14890 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.9.1 Reporter: Jun Qin To test {{KeyedCoProcessFunction}}, one can use {{KeyedCoProcessOperator}} and {{KeyedTwoInputStreamOperatorTestHarness}}, to test {{KeyedBroadcastProcessFunction}}, I see {{CoBroadcastWithKeyedOperator}}, but the TestHarness class is missing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14054) Enable checkpointing via job configuration
Jun Qin created FLINK-14054: --- Summary: Enable checkpointing via job configuration Key: FLINK-14054 URL: https://issues.apache.org/jira/browse/FLINK-14054 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Jun Qin Currently enabling checkpointing can only be done via the job code, see the following quote from this Flink [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing] doc: {quote}By default, checkpointing is disabled. To enable checkpointing, call {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ is the checkpoint interval in milliseconds. {quote} This makes enabling checkingpointing after the job code has been released difficult: one has to change and rebuild the job code. In addition, not only for developer, making checkpointing enabling configurable is also of interest for operation teams: * They may want to enable checkpointing for production but disable in test (e.g., to save storage space) * They may want to try out with and without checkpointing to evaluate the impact to the job behaviour and performance. Therefore, this request. Thanks. -- This message was sent by Atlassian Jira (v8.3.2#803003)