[jira] [Created] (FLINK-35419) scan.bounded.latest-offset makes queries never finish if the latest message is a EndTxn Kafka marker
Fabian Paul created FLINK-35419: --- Summary: scan.bounded.latest-offset makes queries never finish if the latest message is a EndTxn Kafka marker Key: FLINK-35419 URL: https://issues.apache.org/jira/browse/FLINK-35419 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.19.0, 1.17.0, 1.16.0, 1.8.0 Reporter: Fabian Paul When running the kafka connector in bounded mode, the stop condition can be defined as the latest offset when the job starts. Unfortunately, Kafka's latest offset calculation also includes special marker records, such as transaction markers, in the overall count. When Flink waits for a job to finish, it compares the number of records read until the point with the original latest offset [1]. Since the consumer will never see the special marker records, the latest offset is never reached, and the job gets stuck. To reproduce the issue, you can write into a Kafka topic and make sure that the latest record is a transaction end event. Afterwards you can start a Flink job configured with `scan.bounded.latest-offset` pointing to that topic. [1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint
Fabian Paul created FLINK-30238: --- Summary: Unified Sink committer does not clean up state on final savepoint Key: FLINK-30238 URL: https://issues.apache.org/jira/browse/FLINK-30238 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.3, 1.17.0, 1.16.1 Reporter: Fabian Paul During stop-with-savepoint the committer only commits the pending committables on notifyCheckpointComplete. This has several downsides. * Last committableSummary has checkpoint id LONG.MAX and is never cleared from the state leading to that stop-with-savepoint does not work when the pipeline recovers from a savepoint * While the committables are committed during stop-with-savepoint they are not forwarded to post-commit topology, potentially losing data and preventing to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29583) Ensure correct subtaskId and checkpointId is set during committer state migration
Fabian Paul created FLINK-29583: --- Summary: Ensure correct subtaskId and checkpointId is set during committer state migration Key: FLINK-29583 URL: https://issues.apache.org/jira/browse/FLINK-29583 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.2, 1.16.0, 1.17.0 Reporter: Fabian Paul We already discovered two problems during recovery of committers when a post commit topology is used * https://issues.apache.org/jira/browse/FLINK-29509 * https://issues.apache.org/jira/browse/FLINK-29512 Both problems also apply when recovering Flink 1.14 unified sinks committer state and migrate it to the extended unified model. As part of this ticket we should fix both issues for the migration and also increase the test coverage for the migration cases i.e. add test cases in CommitterOperatorTest and CommittableCollectorSerializerTest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29512) Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery
Fabian Paul created FLINK-29512: --- Summary: Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery Key: FLINK-29512 URL: https://issues.apache.org/jira/browse/FLINK-29512 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.2, 1.17.0, 1.16.1 Reporter: Fabian Paul Similar to the issue described in https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of committables, the subtaskCommittables checkpointId is set to always 1 [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193] while the holding CheckpointCommittableManager is initialized with the checkpointId that is written into state [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155 .|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.] This leads to that during a recovery, the post-commit topology will receive a committable summary with the recovered checkpoint id and multiple `CommittableWithLinage`s with the reset checkpointId causing orphaned `CommittableWithLinages` without a `CommittableSummary` failing the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29509) Set correct subtaskId during recovery of committables
Fabian Paul created FLINK-29509: --- Summary: Set correct subtaskId during recovery of committables Key: FLINK-29509 URL: https://issues.apache.org/jira/browse/FLINK-29509 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.2, 1.17.0, 1.16.1 Reporter: Fabian Paul When we recover the `CheckpointCommittableManager` we ignore the subtaskId it is recovered on. [https://github.com/apache/flink/blob/d191bda7e63a2c12416cba56090e5cd75426079b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L58] This becomes a problem when a sink uses a post-commit topology because multiple committer operators might forward committable summaries coming from the same subtaskId. It should be possible to use the subtaskId already present in the `CommittableCollector` when creating the `CheckpointCommittableManager`s. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27493) Forward all numeric Kafka metrics to Flink's metrics system
Fabian Paul created FLINK-27493: --- Summary: Forward all numeric Kafka metrics to Flink's metrics system Key: FLINK-27493 URL: https://issues.apache.org/jira/browse/FLINK-27493 Project: Flink Issue Type: New Feature Components: Connectors / Kafka Affects Versions: 1.16.0 Reporter: Fabian Paul With the upgrade of the Kafka version to 2.8, it is now possible to access more metrics from the KafkaConsumer/KafkaProducer. So far we only forward metrics that are of type Double but ignore other numeric values. It might be worthwhile to forward all numerics metrics. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27486) Reduce ArchUnit violations in connector base module
Fabian Paul created FLINK-27486: --- Summary: Reduce ArchUnit violations in connector base module Key: FLINK-27486 URL: https://issues.apache.org/jira/browse/FLINK-27486 Project: Flink Issue Type: Sub-task Components: Connectors / Common Affects Versions: 1.16.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27484) Reduce ArchUnit violations in the project
Fabian Paul created FLINK-27484: --- Summary: Reduce ArchUnit violations in the project Key: FLINK-27484 URL: https://issues.apache.org/jira/browse/FLINK-27484 Project: Flink Issue Type: Improvement Components: API / Core, Connectors / Common, Runtime / Configuration Reporter: Fabian Paul When ArchUnit was introduced we deliberately ignored the existing violations. This is the umbrella ticket to hold the efforts to reduce the exposure. In the long run, this gives our users clarity about using or not using a certain part of the codebase. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27480) KafkaSources sharing the groupId might lead to InstanceAlreadyExistException warning
Fabian Paul created FLINK-27480: --- Summary: KafkaSources sharing the groupId might lead to InstanceAlreadyExistException warning Key: FLINK-27480 URL: https://issues.apache.org/jira/browse/FLINK-27480 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.4, 1.15.0, 1.16.0 Reporter: Fabian Paul More and more frequently, users ran into an issue by not correctly configuring the KafkaSource ([https://stackoverflow.com/questions/72026997/flink-instancealreadyexistsexception-while-migrating-to-the-kafkasource)] and setting non-distinguishable groupIds for the source. Internally the used KafkaConsumer tries to register with the metric system and incorporates the groupId as part of the metric, leading to name collision. We should update the documentation to explain the situation properly. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-26701) Relocation of connector-base might break user jars due to changed imports
Fabian Paul created FLINK-26701: --- Summary: Relocation of connector-base might break user jars due to changed imports Key: FLINK-26701 URL: https://issues.apache.org/jira/browse/FLINK-26701 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 With the introduction of FLINK-25927, every connector now relocates connector-base to better support connectors compatibility with different Flink versions. Unfortunately, not all classes in connector-base are only used by connector but some are supposed to be used inside the user jar directly i.e. DeliveryGuarantee, HybridSource... Since the connector now relocates the module the existing imports are broken. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26633) Elasticsearch connector does not report recordsSend metric
Fabian Paul created FLINK-26633: --- Summary: Elasticsearch connector does not report recordsSend metric Key: FLINK-26633 URL: https://issues.apache.org/jira/browse/FLINK-26633 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 As part of a unified sink, it is recommended to report a recordSend metric to let users track the number of outgoing records from Flink. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26613) Stateful unified Sink V2 upgrades only work when operator uids are given
Fabian Paul created FLINK-26613: --- Summary: Stateful unified Sink V2 upgrades only work when operator uids are given Key: FLINK-26613 URL: https://issues.apache.org/jira/browse/FLINK-26613 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 As part of documentation [1]we guarantee that in case a stateful migration fails and no uids are used that it is possible to bind the recovered state via the uidHash. This ticket should add a uidHash setter for the operators that are affected by the migration (writer, committer, global committer). [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#preconditions -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26516) Sink V2 is not state compatible with Sink V1
Fabian Paul created FLINK-26516: --- Summary: Sink V2 is not state compatible with Sink V1 Key: FLINK-26516 URL: https://issues.apache.org/jira/browse/FLINK-26516 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 While working https://issues.apache.org/jira/browse/FLINK-26173 we decided to split off the state compatibility issue to lower the priority of the behavioral issue since it does not affect many users. This ticket is solely responsible to fix the state incompatibility between Sink V1 and Sink V2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26445) Unified Sink cannot restore committer state
Fabian Paul created FLINK-26445: --- Summary: Unified Sink cannot restore committer state Key: FLINK-26445 URL: https://issues.apache.org/jira/browse/FLINK-26445 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.14.3 Reporter: Fabian Paul In Flink 1.14 the writer and committer operator became one operator in streaming mode. Unfortunately, the state is still associated with the previous physical operator so during restore the job fails due non-restored state of the committer operator. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26416) Release Testing:
Fabian Paul created FLINK-26416: --- Summary: Release Testing: Key: FLINK-26416 URL: https://issues.apache.org/jira/browse/FLINK-26416 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 With the introduction of Sink V2, the operator model of the sink changed slightly therefore it makes sense to test different upgrade/sanity scenarios. You can take any of the existing Sinks in the project. I would recommend the FileSink. # Run a job with Flink 1.14 and take a savepoint and try to restore and resume with 1.15 # Run a job with Flink 1.15 and take a savepoint and try to restore and resume with 1.15 # Run a bounded job with Flink 1.15 In all cases, please verify that all records have been written at the end of the scenario and there are no duplicates. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26358) Operator maxParallelism is lost during chaining
Fabian Paul created FLINK-26358: --- Summary: Operator maxParallelism is lost during chaining Key: FLINK-26358 URL: https://issues.apache.org/jira/browse/FLINK-26358 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.3, 1.15.0 Reporter: Fabian Paul During the generation of the JobGraph from the StreamGraph the maxParallelism of the chained operators is lost so the maxParallelism written to a snapshot might not reflect the real maxParallelism of the operator. If a user now unchains the operator it is not possible to restore a snapshot anymore because the maxParallelism of the operator and the snapshot do not match anymore. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26304) GlobalCommitter can receive failed committables
Fabian Paul created FLINK-26304: --- Summary: GlobalCommitter can receive failed committables Key: FLINK-26304 URL: https://issues.apache.org/jira/browse/FLINK-26304 Project: Flink Issue Type: Bug Components: API / DataStream, Connectors / Common Affects Versions: 1.14.3 Reporter: Fabian Paul Fix For: 1.14.4 After the introduction of retrying failed committables in the committer, it is important to only forward committables that have been committed to the GlobalCommitter. Currently, this is ignored and the Committer will forward all committable nevertheless if they succeeded. [1] https://github.com/apache/flink/blob/0a76d632f33d9a69df87457a63043bd7f609ed40/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java#L144 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26254) KafkaSink might violate order of sequence numbers and risk exactly-once processing
Fabian Paul created FLINK-26254: --- Summary: KafkaSink might violate order of sequence numbers and risk exactly-once processing Key: FLINK-26254 URL: https://issues.apache.org/jira/browse/FLINK-26254 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.3, 1.15.0 Reporter: Fabian Paul When running the KafkaSink in exactly-once mode with a very low checkpoint interval users are seeing `OutOfOrderSequenceException`. It could be caused by the fact that the connector has a pool of KafkaProducers and the sequence numbers are not shared/reset if a new KafkaProducer tries to write to a partition while the previous KafkaProducer is still occupied for committing. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26119) AsyncSinkWriterStateSerializer needs to be PublicEvolving
Fabian Paul created FLINK-26119: --- Summary: AsyncSinkWriterStateSerializer needs to be PublicEvolving Key: FLINK-26119 URL: https://issues.apache.org/jira/browse/FLINK-26119 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Developers who want to create an AsyncSink are supposed to use the AsyncSinkWriterStateSerializer to create the writer serializer. In hindsight of the upcoming connector externalization, this class needs to be made PublicEvolving to guarantee some stability. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26118) AsyncSinks do not support downscaling with state
Fabian Paul created FLINK-26118: --- Summary: AsyncSinks do not support downscaling with state Key: FLINK-26118 URL: https://issues.apache.org/jira/browse/FLINK-26118 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Currently, the AsyncSinkWriter assumes to always be restored exactly from only one previous writer state but in case of a downscaling after a snapshot a writer will receive multiple states. https://github.com/apache/flink/blob/c60eb0c3b4bf7dc045dd7a1da2080c7befebb8dc/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L438 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25921) Support different input parallelism for preCommit topology
Fabian Paul created FLINK-25921: --- Summary: Support different input parallelism for preCommit topology Key: FLINK-25921 URL: https://issues.apache.org/jira/browse/FLINK-25921 Project: Flink Issue Type: Sub-task Components: API / DataStream Affects Versions: 1.15.0, 1.16.0 Reporter: Fabian Paul Currently, we assume that the pre-commit topology has the same parallelism as the operator before when inserting the failover region. To support a different parallelism we might need to insert a different identity map to customize the mapping. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25920) Allow receiving updates of CommittableSummary
Fabian Paul created FLINK-25920: --- Summary: Allow receiving updates of CommittableSummary Key: FLINK-25920 URL: https://issues.apache.org/jira/browse/FLINK-25920 Project: Flink Issue Type: Sub-task Components: API / DataStream, Connectors / Common Affects Versions: 1.15.0, 1.16.0 Reporter: Fabian Paul In the case of unaligned checkpoints, it might happen that the checkpoint barrier overtakes the records and an empty committable summary is emitted that needs to be correct at a later point when the records arrive. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25919) Sink V2 improvements and followups
Fabian Paul created FLINK-25919: --- Summary: Sink V2 improvements and followups Key: FLINK-25919 URL: https://issues.apache.org/jira/browse/FLINK-25919 Project: Flink Issue Type: Improvement Components: API / DataStream, Connectors / Common Affects Versions: 1.16.0 Reporter: Fabian Paul This is an umbrella ticket for know limitations and improvements we still want to do for the Sink V2 https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25857) Add committer metrics to track the status of committables
Fabian Paul created FLINK-25857: --- Summary: Add committer metrics to track the status of committables Key: FLINK-25857 URL: https://issues.apache.org/jira/browse/FLINK-25857 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Fabian Paul With Sink V2 we can now track the progress of a committable during committing and show metrics about the committing status. (i.e. failed, retried, succeeded). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25726) Implement GlobalCommitter as custom post commit topology
Fabian Paul created FLINK-25726: --- Summary: Implement GlobalCommitter as custom post commit topology Key: FLINK-25726 URL: https://issues.apache.org/jira/browse/FLINK-25726 Project: Flink Issue Type: Sub-task Components: API / DataStream, Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul The global committer was rarely used before in Sink V1 so we decided to make it an extension that is only exposed to a specific set of users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25608) Mark metrics as Public(Evolving)
Fabian Paul created FLINK-25608: --- Summary: Mark metrics as Public(Evolving) Key: FLINK-25608 URL: https://issues.apache.org/jira/browse/FLINK-25608 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Affects Versions: 1.15.0 Reporter: Fabian Paul With the introduction of architectural tests and the exposure of the metrics API to user-facing components the metrics classes also need proper annotations. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25591) Use FileSource for StreamExecutionEnvironment.readFiles
Fabian Paul created FLINK-25591: --- Summary: Use FileSource for StreamExecutionEnvironment.readFiles Key: FLINK-25591 URL: https://issues.apache.org/jira/browse/FLINK-25591 Project: Flink Issue Type: New Feature Components: API / DataStream, Connectors / FileSystem Reporter: Fabian Paul Evaluate whether it is possible to replace the internals of StreamExecutionEnvironment.readFiles with the FileSource. The FileSource can already read the state of the previous version but some functionality might be missing i.e. nested file enumeration, filter files. If the replacement is not possible we should add a new method based on the FileSource and deprecated the old one. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25578) Graduate Sink V1 interfaces to PublicEvolving and deprecate them
Fabian Paul created FLINK-25578: --- Summary: Graduate Sink V1 interfaces to PublicEvolving and deprecate them Key: FLINK-25578 URL: https://issues.apache.org/jira/browse/FLINK-25578 Project: Flink Issue Type: Sub-task Components: API / Core Affects Versions: 1.15.0 Reporter: Fabian Paul In the discussion of FLIP-191 we decided to not break existing sinks and give them time to migrate to Sink V2. We do not plan to delete Sink V1 in near future and still support but we won't develop new features. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25575) Implement StreamGraph translation for Sink V2 interfaces
Fabian Paul created FLINK-25575: --- Summary: Implement StreamGraph translation for Sink V2 interfaces Key: FLINK-25575 URL: https://issues.apache.org/jira/browse/FLINK-25575 Project: Flink Issue Type: Sub-task Components: API / Core, API / DataStream Affects Versions: 1.15.0 Reporter: Fabian Paul This task covers the translation from the user-defined interfaces and the extension topologies to the actual operators. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25574) Update Async Sink to use decomposed interfaces
Fabian Paul created FLINK-25574: --- Summary: Update Async Sink to use decomposed interfaces Key: FLINK-25574 URL: https://issues.apache.org/jira/browse/FLINK-25574 Project: Flink Issue Type: Sub-task Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25573) Update Kafka Sink to use decomposed interfaces
Fabian Paul created FLINK-25573: --- Summary: Update Kafka Sink to use decomposed interfaces Key: FLINK-25573 URL: https://issues.apache.org/jira/browse/FLINK-25573 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Affects Versions: 1.15.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25572) Update File Sink to use decomposed interfaces
Fabian Paul created FLINK-25572: --- Summary: Update File Sink to use decomposed interfaces Key: FLINK-25572 URL: https://issues.apache.org/jira/browse/FLINK-25572 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Affects Versions: 1.15.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25571) Update Elasticsearch Sink to use decomposed interfaces
Fabian Paul created FLINK-25571: --- Summary: Update Elasticsearch Sink to use decomposed interfaces Key: FLINK-25571 URL: https://issues.apache.org/jira/browse/FLINK-25571 Project: Flink Issue Type: Sub-task Components: Connectors / ElasticSearch Affects Versions: 1.15.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25570) Introduce Sink V2 extension APIs
Fabian Paul created FLINK-25570: --- Summary: Introduce Sink V2 extension APIs Key: FLINK-25570 URL: https://issues.apache.org/jira/browse/FLINK-25570 Project: Flink Issue Type: Sub-task Components: API / Core, Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 This task introduces the interfaces needed to implement the custom operations before/after the writer and committer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25569) Introduce decomposed Sink V2 interfaces
Fabian Paul created FLINK-25569: --- Summary: Introduce decomposed Sink V2 interfaces Key: FLINK-25569 URL: https://issues.apache.org/jira/browse/FLINK-25569 Project: Flink Issue Type: Sub-task Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Fabian Paul Fix For: 1.15.0 This task introduces the interfaces described in [1] without the datastream extension hooks. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25555) FLIP-191: Extend unified Sink interface to support small file compaction
Fabian Paul created FLINK-2: --- Summary: FLIP-191: Extend unified Sink interface to support small file compaction Key: FLINK-2 URL: https://issues.apache.org/jira/browse/FLINK-2 Project: Flink Issue Type: New Feature Components: API / Core, API / DataStream, Connectors / Common Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25222) Remove NetworkFailureProxy used for Kafka connector tests
Fabian Paul created FLINK-25222: --- Summary: Remove NetworkFailureProxy used for Kafka connector tests Key: FLINK-25222 URL: https://issues.apache.org/jira/browse/FLINK-25222 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Test Infrastructure Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul Recently the number of Kafka connector tests either hitting a timeout due to blocked networking or corrupted network responses increased significantly. We think it is also by our custom network failure implementation since all the tests are for the legacy FlinkKafkaProducer or FlinkKafkaConsumer we want safely remove them because we will not add more features to this connector, to increase the overall stability. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25206) Add config option to disable configurations in the user program
Fabian Paul created FLINK-25206: --- Summary: Add config option to disable configurations in the user program Key: FLINK-25206 URL: https://issues.apache.org/jira/browse/FLINK-25206 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Runtime / Coordination Affects Versions: 1.15.0 Reporter: Fabian Paul If you run a large number of Flink jobs usually there is a governing entity controlling and ensuring the execution of all clusters. To ease the control over configurations we want to enable setting a configuration that disallows configurations in the user program. By default, this is turned off so the normal execution will not be affected. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25134) Unused RetryRule in KafkaConsumerTestBase swallows retries
Fabian Paul created FLINK-25134: --- Summary: Unused RetryRule in KafkaConsumerTestBase swallows retries Key: FLINK-25134 URL: https://issues.apache.org/jira/browse/FLINK-25134 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.13.3, 1.14.0, 1.15.0 Reporter: Fabian Paul After merging https://issues.apache.org/jira/browse/FLINK-15493 a few tests are still not retried because the KafkaConsumerTestBase overwrites the RetryRule unnecessarily. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25041) E2E tar ball cache fails without error message if target directory not specified
Fabian Paul created FLINK-25041: --- Summary: E2E tar ball cache fails without error message if target directory not specified Key: FLINK-25041 URL: https://issues.apache.org/jira/browse/FLINK-25041 Project: Flink Issue Type: Bug Components: Test Infrastructure Affects Versions: 1.13.3, 1.14.0, 1.15.0 Reporter: Fabian Paul We want to verify if the variable has been set. {code:java} if [ -z "$E2E_TARBALL_CACHE" ] ; then echo "You have to export the E2E Tarball Cache as E2E_TARBALL_CACHE" exit 1 fi {code} but the shown code immediately fails with an `unbound variable` error if the variable is not set and it does not evaluate the branch. We should change it to something like this {code:java} if [ "${E2E_TARBALL_CACHE+x}" == x ] ; then echo "You have to export the E2E Tarball Cache as E2E_TARBALL_CACHE" exit 1 fi {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24858) TypeSerializer version mismatch during eagerly restore
Fabian Paul created FLINK-24858: --- Summary: TypeSerializer version mismatch during eagerly restore Key: FLINK-24858 URL: https://issues.apache.org/jira/browse/FLINK-24858 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.13.3, 1.14.0, 1.15.0 Reporter: Fabian Paul Currently, some of our TypeSerializer snapshots assume information about the binary layout of the actual data rather than only holding information about the TypeSerialzer. Multiple users ran into this problem i.e.[https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx|https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx] {quote}This manifest itself when state is restored egarly (for example an operator state) but, for example a user doesn't register the state on their intializeState/open,* and then a checkpoint happens. The result is that we will have elements serialized according to an old binary layout, but our serializer snapshot declares a new version which indicates that the elements are written with a new binary layout. The next restore will fail. {quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24773) KafkaCommitter should fail on unknown Exception
Fabian Paul created FLINK-24773: --- Summary: KafkaCommitter should fail on unknown Exception Key: FLINK-24773 URL: https://issues.apache.org/jira/browse/FLINK-24773 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul Some of the exceptions during the committing phase are tolerable or even retriable but generally, if an unknown exception is raised we should fail the job. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24765) Upgrade Kafka dependency
Fabian Paul created FLINK-24765: --- Summary: Upgrade Kafka dependency Key: FLINK-24765 URL: https://issues.apache.org/jira/browse/FLINK-24765 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.15.0 Reporter: Fabian Paul We rely on a very old Kafka version 2.4.1 which was the last version supporting scala 2.11. Since we dropped Scala 2.11 we can now update to a more recent one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24647) ClusterUncaughtExceptionHandler does not log the exception
Fabian Paul created FLINK-24647: --- Summary: ClusterUncaughtExceptionHandler does not log the exception Key: FLINK-24647 URL: https://issues.apache.org/jira/browse/FLINK-24647 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul If an uncaught exception occurs and the uncaught exception handler is configured to log it swallows the exception stack trace. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24612) Kafka test container creates a large amount of logs
Fabian Paul created FLINK-24612: --- Summary: Kafka test container creates a large amount of logs Key: FLINK-24612 URL: https://issues.apache.org/jira/browse/FLINK-24612 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul When we use a testcontainer setup we try to forward all container STDOUT logs to the surrounding test logger. Unfortunately, Kafka loggers are by default writing a large number of logs because some of the internal loggers are defaulting to TRACE logging. A good example is this test job https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=25084&view=logs&j=32a18cd8-d404-5807-996d-abcee436b891 where one of the test was stuck and the generated artifact is ~25GB. This makes debugging very hard because the file is hard to parse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24608) Sinks built with the unified sink framework do not receive timestamps when used in Table API
Fabian Paul created FLINK-24608: --- Summary: Sinks built with the unified sink framework do not receive timestamps when used in Table API Key: FLINK-24608 URL: https://issues.apache.org/jira/browse/FLINK-24608 Project: Flink Issue Type: Bug Components: Connectors / Common, Table SQL / Planner Affects Versions: 1.13.3, 1.14.0, 1.15.0 Reporter: Fabian Paul All sinks built with the unified sink framework extract the timestamp from the internal {{StreamRecord}}. The Table API does not facilitate the timestamp field in the {{StreamRecord}} but extracts the timestamp from the actual data. We either have to use a dedicated operator before all the sinks to simulate the behavior or allow a customizable timestamp extraction during the sink translation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24530) GlobalCommitter might not commit all records on drain
Fabian Paul created FLINK-24530: --- Summary: GlobalCommitter might not commit all records on drain Key: FLINK-24530 URL: https://issues.apache.org/jira/browse/FLINK-24530 Project: Flink Issue Type: Bug Components: API / DataStream, Connectors / Common Affects Versions: 1.13.2, 1.14.0, 1.15.0 Reporter: Fabian Paul If a pipeline has a unified Sink that implements a {{Committer}} and {{GlobalCommitter}} committables are forwarded from the {{Committer}} on either receiving {{notifyCheckpointCompleted}} or {{endInput}}. Since both methods are triggered by RPC calls we currently do not guarantee an order when either method is called on the {{Committer}} or {{GlobalCommitter}}. This can lead to that the {{GlobalCommitter}} receives the RPC call at first and tries to commit although the {{Committer}} has nothing forwarded yet. For the normal streaming execution, it means that the visibility of the final commit could be delayed by one checkpoint. Unfortunately, if the pipeline is stopped with a savepoint and drained the {{endInput}} might be called at the {{GlobalCommitter}} before the {{Committer}} leading to potential data loss. With FLIP-147 (checkpoints after task finish) fully implemented the problem is mitigated because it guarantees the order of the final {{notifyCheckpointCompleted}} call although it still suffers from the potentially delayed visibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24488) KafkaRecordSerializationSchemaBuilder does not forward timestamp
Fabian Paul created FLINK-24488: --- Summary: KafkaRecordSerializationSchemaBuilder does not forward timestamp Key: FLINK-24488 URL: https://issues.apache.org/jira/browse/FLINK-24488 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul When building a KafkaRecordSerializationSchema with its builder the extracted event time timestamp is currently not set at the resulting ProducerRecord. https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java#L328 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24397) Reduce TableSchema usage in Table API connectors
Fabian Paul created FLINK-24397: --- Summary: Reduce TableSchema usage in Table API connectors Key: FLINK-24397 URL: https://issues.apache.org/jira/browse/FLINK-24397 Project: Flink Issue Type: Improvement Components: Connectors / Common, Table SQL / API Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul The TableSchema class is now deprecated for a while but a lot of connectors still depend on it use (DynamicTableSources and DynamicTableSinks). We should reduce the usage of the TableSchema to provide a better example for external contributions and allow us eventually to drop the TableSchema. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24382) RecordsOut metric for sinks is inaccurate
Fabian Paul created FLINK-24382: --- Summary: RecordsOut metric for sinks is inaccurate Key: FLINK-24382 URL: https://issues.apache.org/jira/browse/FLINK-24382 Project: Flink Issue Type: Improvement Components: Connectors / Common, Runtime / Metrics Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul Currently, the metric is computed on the operator level and it is assumed that every record flowing into the sink also generates one outgoing record. This is often not reasonable because the sinks can transform incoming records into multiple outgoing records, thus the metric should be implemented by the sink implementors and not be reasoned by the framework. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24372) Deprecate Elasticsearch Sinkfunctions
Fabian Paul created FLINK-24372: --- Summary: Deprecate Elasticsearch Sinkfunctions Key: FLINK-24372 URL: https://issues.apache.org/jira/browse/FLINK-24372 Project: Flink Issue Type: Sub-task Components: Connectors / ElasticSearch Affects Versions: 1.15.0 Reporter: Fabian Paul Once all other tickets of FLINK-24323 are resolved we can mark the Elasticsearch sinks implementing sinkfunction as deprecated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24371) Support SinkWriter preCommit without the need of a committer
Fabian Paul created FLINK-24371: --- Summary: Support SinkWriter preCommit without the need of a committer Key: FLINK-24371 URL: https://issues.apache.org/jira/browse/FLINK-24371 Project: Flink Issue Type: Bug Components: API / DataStream, Connectors / Common Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul For some sinks i.e. elasticsearch we only want to listen for the snapshot barrier once retrieved the sink flushes the buffered records. These sinks do not snapshot any state thus do not implement the `snapshotState()` method. We already have a NoopCommitHandler which swallows the passed committables but it should at least call `preCommit()` on the SinkWriter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24368) Collect logs of Flink container in e2e source test suite
Fabian Paul created FLINK-24368: --- Summary: Collect logs of Flink container in e2e source test suite Key: FLINK-24368 URL: https://issues.apache.org/jira/browse/FLINK-24368 Project: Flink Issue Type: Improvement Components: Test Infrastructure Affects Versions: 1.14.0 Reporter: Fabian Paul Currently, the logs from the Flink containers used for testing can only be accessed through manual looking into the container via docker commands. It makes debugging source e2e tests failures on the CI very difficult. I see two options # Stream the logs while the test is running to the stdout # After a test has failed copy the logs to some log directory (that has the disadvantage that in case the container is stuck and the timeout daemon kills the test we do not get a nice thread dump because the container is not part of the test jvm) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24347) KafkaSource cannot checkpoint if the parallelism is higher than the partition number
Fabian Paul created FLINK-24347: --- Summary: KafkaSource cannot checkpoint if the parallelism is higher than the partition number Key: FLINK-24347 URL: https://issues.apache.org/jira/browse/FLINK-24347 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.13.2, 1.14.0 Reporter: Fabian Paul The KafkaSourceEnumerator signals all the readers if there are no more splits available and the readers shut down. In case the parallelism is higher than the partitions of the consumed topic there are not enough partitions to distribute and reader subtasks go immediately into FINISHED state. Currently, it is not possible to checkpoint if parts of the job are finished. (this is lifted once FLIP-147 is by default enabled Flink 1.15+) We should only signal to the readers to go into finished if the job is in bounded execution mode and keep them idling otherwise. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24308) Translate KafkaSink docs to chinese
Fabian Paul created FLINK-24308: --- Summary: Translate KafkaSink docs to chinese Key: FLINK-24308 URL: https://issues.apache.org/jira/browse/FLINK-24308 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.14.0 Reporter: Fabian Paul With https://issues.apache.org/jira/browse/FLINK-23664 only the English documentation was updated. We also have to update the Chinese docs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24292) Update Flink's Kafka examples to use KafkaSink
Fabian Paul created FLINK-24292: --- Summary: Update Flink's Kafka examples to use KafkaSink Key: FLINK-24292 URL: https://issues.apache.org/jira/browse/FLINK-24292 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24282) KafkaRecordSerializationSchema TopicSelector is not serializable
Fabian Paul created FLINK-24282: --- Summary: KafkaRecordSerializationSchema TopicSelector is not serializable Key: FLINK-24282 URL: https://issues.apache.org/jira/browse/FLINK-24282 Project: Flink Issue Type: Bug Affects Versions: 1.14.0 Reporter: Fabian Paul To dynamically calculate the outgoing topic we allow passing a lambda. Unfortunately, it is currently not marked as serializable hence the following code fails in during closure cleaning when used within a job. {code:java} KafkaRecordSerializationSchema.builder() .setTopic(topic) .setValueSerializationSchema(serSchema) .setPartitioner(partitioner) .build()) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24281) Migrate all existing tests to new Kafka Sink
Fabian Paul created FLINK-24281: --- Summary: Migrate all existing tests to new Kafka Sink Key: FLINK-24281 URL: https://issues.apache.org/jira/browse/FLINK-24281 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul The FlinkKafkaProducer is deprecated since 1.14 but a lot of existing tests are still using. We should replace it with the KafkaSink because it completely subsumes it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24079) BufferedUpsertSinkFunction can loose records during failover
Fabian Paul created FLINK-24079: --- Summary: BufferedUpsertSinkFunction can loose records during failover Key: FLINK-24079 URL: https://issues.apache.org/jira/browse/FLINK-24079 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / Ecosystem Affects Versions: 1.13.2 Reporter: Fabian Paul The internally used buffer is not snapshotted on checkpoint which can lead to loosing on failure. We need to snapshot the buffer similarly to FLINK-23875. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24055) Deprecate FlinkKafkaConsumer
Fabian Paul created FLINK-24055: --- Summary: Deprecate FlinkKafkaConsumer Key: FLINK-24055 URL: https://issues.apache.org/jira/browse/FLINK-24055 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul With the introduction of the KafkaSource https://issues.apache.org/jira/browse/FLINK-18323 we should deprecate the FlinkKafkaConsumer to hint users to start the migration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24051) Make consumer.group-id optional for KafkaSource
Fabian Paul created FLINK-24051: --- Summary: Make consumer.group-id optional for KafkaSource Key: FLINK-24051 URL: https://issues.apache.org/jira/browse/FLINK-24051 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul For most of the users it is not necessary to generate a group-id and the source itself can provide a meaningful group-id during startup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24032) StreamSink does not receive periodic watermarks
Fabian Paul created FLINK-24032: --- Summary: StreamSink does not receive periodic watermarks Key: FLINK-24032 URL: https://issues.apache.org/jira/browse/FLINK-24032 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.14.0 Reporter: Fabian Paul In the following scenario, the sink will never receive watermarks {code:java} env.readFile(...) .assignTimestampsAndWatermarks(format, file) .rebalance() .addSink(...); {code} I also noticed that when changing the code to the following the watermarks flow to the sink {code:java} env.readFile(...) .assignTimestampsAndWatermarks(format, file) .rebalance() .process(new ProcessFunction() {...}) .addSink(...); {code} An example test case is accessible here [https://github.com/fapaul/flink/blob/9b749ac80cd128a7f288da45db313bafa39d8008/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java#L68] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24006) MailboxExecutorImplTest#testIsIdle does not test the correct behaviour
Fabian Paul created FLINK-24006: --- Summary: MailboxExecutorImplTest#testIsIdle does not test the correct behaviour Key: FLINK-24006 URL: https://issues.apache.org/jira/browse/FLINK-24006 Project: Flink Issue Type: Bug Components: Runtime / Task, Tests Affects Versions: 1.13.2, 1.12.5, 1.14.0 Reporter: Fabian Paul The test was introduced to ensure that the mailbox idleness is still counting new messages although the mailbox loop might have been stopped. Unfortunately, the test does not stop the mailbox processor currently which leads to than the test even passes without the actual code changes of https://issues.apache.org/jira/browse/FLINK-19109 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23875) ReducingUpsertSink can loose record during failover
Fabian Paul created FLINK-23875: --- Summary: ReducingUpsertSink can loose record during failover Key: FLINK-23875 URL: https://issues.apache.org/jira/browse/FLINK-23875 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / API Affects Versions: 1.14.0 Reporter: Fabian Paul When trying to rework the Table API Kafka connector to make it compatible with the new KafkaSink I noticed that currently the buffer which is used to reduce the update-before and update-after calls is not snapshotted which can result in data loss if the job fails while the buffer is not empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23838) Add FLIP-33 metrics to new KafkaSink
Fabian Paul created FLINK-23838: --- Summary: Add FLIP-33 metrics to new KafkaSink Key: FLINK-23838 URL: https://issues.apache.org/jira/browse/FLINK-23838 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Reporter: Fabian Paul Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23814) Test FLIP-143 KafkaSink
Fabian Paul created FLINK-23814: --- Summary: Test FLIP-143 KafkaSink Key: FLINK-23814 URL: https://issues.apache.org/jira/browse/FLINK-23814 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Reporter: Fabian Paul Fix For: 1.14.0 The following scenarios are worthwhile to test * Start simple job with None/At-least once delivery guarantee and write records to kafka topic * Start simple job with exactly-once delivery guarantee and write records to kafka topic. The records should only be visible with a `read-committed` consumer * Stop a job with exactly-once delivery guarantee and restart it with different parallelism (scale-down, scale-up) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23735) Migrate BufferedUpsertSinkFunction to FLIP-143
Fabian Paul created FLINK-23735: --- Summary: Migrate BufferedUpsertSinkFunction to FLIP-143 Key: FLINK-23735 URL: https://issues.apache.org/jira/browse/FLINK-23735 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul The BufferedUpsertSinkFunction is still using the old sink interfaces and relies on the old Kafka DataStream connector FlinkKafkaProducer. We need to migrate it to the new Sink API to also leverage the new KafkaSink connector and finally deprecate the FlinkKafkaProducer and all its belongings. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23710) Move sink to org.apache.kafka.conntor.kafka.sink package
Fabian Paul created FLINK-23710: --- Summary: Move sink to org.apache.kafka.conntor.kafka.sink package Key: FLINK-23710 URL: https://issues.apache.org/jira/browse/FLINK-23710 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23664) Write documentation for new KafkaSink
Fabian Paul created FLINK-23664: --- Summary: Write documentation for new KafkaSink Key: FLINK-23664 URL: https://issues.apache.org/jira/browse/FLINK-23664 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23640) Create a KafkaRecordSerializationSchemas valueOnly helper
Fabian Paul created FLINK-23640: --- Summary: Create a KafkaRecordSerializationSchemas valueOnly helper Key: FLINK-23640 URL: https://issues.apache.org/jira/browse/FLINK-23640 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul Commonly users only want to serialize the value of a Kafka record if they are not using a key schema. For these users, we can provide an easier entry point comparable to \{{ KafkaValueOnlyDeserializerWrapper }}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23639) Migrate Table API to new KafkaSink
Fabian Paul created FLINK-23639: --- Summary: Migrate Table API to new KafkaSink Key: FLINK-23639 URL: https://issues.apache.org/jira/browse/FLINK-23639 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka, Table SQL / API Reporter: Fabian Paul With the KafkaSink ported to FLIP-143 we should also adapt the Table API to leverage the new KafkaSink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23248) SinkWriter is not close when failing
Fabian Paul created FLINK-23248: --- Summary: SinkWriter is not close when failing Key: FLINK-23248 URL: https://issues.apache.org/jira/browse/FLINK-23248 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.12.4, 1.13.1, 1.14.0 Reporter: Fabian Paul Currently the SinkWriter is only closed when the operator finishes in `AbstractSinkWriterOperator#close()` but we also must close the SinkWrite on `AbstractSinkWriterOperator#dispose()` to release possible acquired resources when failing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23124) Implement exactly-once Kafka Sink
Fabian Paul created FLINK-23124: --- Summary: Implement exactly-once Kafka Sink Key: FLINK-23124 URL: https://issues.apache.org/jira/browse/FLINK-23124 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23123) Implement at-least-once Kafka Sink
Fabian Paul created FLINK-23123: --- Summary: Implement at-least-once Kafka Sink Key: FLINK-23123 URL: https://issues.apache.org/jira/browse/FLINK-23123 Project: Flink Issue Type: Sub-task Reporter: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22696) Enable Confluent Schema Registry Test on jdk 11
Fabian Paul created FLINK-22696: --- Summary: Enable Confluent Schema Registry Test on jdk 11 Key: FLINK-22696 URL: https://issues.apache.org/jira/browse/FLINK-22696 Project: Flink Issue Type: Improvement Components: Test Infrastructure Reporter: Fabian Paul Assignee: Fabian Paul -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22434) Dispatcher does not store suspended jobs in execution graph store
Fabian Paul created FLINK-22434: --- Summary: Dispatcher does not store suspended jobs in execution graph store Key: FLINK-22434 URL: https://issues.apache.org/jira/browse/FLINK-22434 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Fabian Paul Assignee: Fabian Paul Fix For: 1.11.4, 1.14.0, 1.12.3, 1.13.1 Only globally terminated jobs are currently stored in the execution graph store after termination. In case the JobManager is shutdown and jobs are still running, these jobs will be suspended which is a non-globally terminated state. The problem surfaces when a user tries to access information about the job during termination, leading to a job not found response. By storing all terminated jobs in the execution graph store this should be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22257) Clarify Flink ConfigOptions Usage
Fabian Paul created FLINK-22257: --- Summary: Clarify Flink ConfigOptions Usage Key: FLINK-22257 URL: https://issues.apache.org/jira/browse/FLINK-22257 Project: Flink Issue Type: Improvement Reporter: Fabian Paul For users, it is hard to determine which ConfigOptions are relevant for the different stages of a Flink application. Beginning from the translation of the user program to the execution on the cluster. In particular which options can be configured through the different channels. * Cluster configuration (i.e. flink-conf.yaml) * Application configuration, code-based -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22256) Persist checkpoint type information
Fabian Paul created FLINK-22256: --- Summary: Persist checkpoint type information Key: FLINK-22256 URL: https://issues.apache.org/jira/browse/FLINK-22256 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Fabian Paul As a user, it is retrospectively difficult to determine what kind of checkpoint (i.e. incremental, unaligned, ...) was performed when looking only at the persisted checkpoint metadata. The only way would be to look into the execution configuration of the job which might not be available anymore and can be scattered across the application code and cluster configuration. It would be highly beneficial if such information would be part of the persisted metadata to not track these external pointers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21879) ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime fails on AZP
Fabian Paul created FLINK-21879: --- Summary: ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime fails on AZP Key: FLINK-21879 URL: https://issues.apache.org/jira/browse/FLINK-21879 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Fabian Paul https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15047&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=6760 {code:java} [ERROR] testWorkerRegistrationTimeoutNotCountingAllocationTime(org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest) Time elapsed: 0.388 s <<< FAILURE! java.lang.AssertionError: Expected: an instance of org.apache.flink.runtime.registration.RegistrationResponse$Success but: is a org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$13.lambda$new$2(ActiveResourceManagerTest.java:789) at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$Context.runTest(ActiveResourceManagerTest.java:857) at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$13.(ActiveResourceManagerTest.java:770) at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime(ActiveResourceManagerTest.java:753) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18501) Mapping of Pluggable Filesystems to scheme is not properly logged
Fabian Paul created FLINK-18501: --- Summary: Mapping of Pluggable Filesystems to scheme is not properly logged Key: FLINK-18501 URL: https://issues.apache.org/jira/browse/FLINK-18501 Project: Flink Issue Type: Improvement Components: FileSystems Affects Versions: 1.10.0, 1.11.0 Reporter: Fabian Paul Currently, the following statement is used during FS instantiation {code:java} LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName()); {code} but with the introduction of pluggable filesystems, the class name will always be PluginFileSystemFactory and does not give a notion which FS is used anymore. A fix could be to replace the class name usage with toString and implement it within the PluginFileSystemFactory. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16883) No support for log4j2 configuration formats besides properties
Fabian Paul created FLINK-16883: --- Summary: No support for log4j2 configuration formats besides properties Key: FLINK-16883 URL: https://issues.apache.org/jira/browse/FLINK-16883 Project: Flink Issue Type: Improvement Components: Command Line Client Affects Versions: 1.11.0 Reporter: Fabian Paul If `flink.console.sh` is used to start a Flink cluster the env java opts precede the log settings. ([link|[https://github.com/apache/flink/blob/1444fd68115594b872201242886b4d789f4b26a5/flink-dist/src/main/flink-bin/bin/flink-console.sh#L73]] This way the log settings `log4.configurationFile` will always overwrite previous keys. Since the `log4j.configurationFile` is set to `log4j.properties` it is not possible to leverage other formats than properties for the configuration. My proposal would be to switch the order of the configurations that the log settings precede the env java opts. Users could then overwrite the default file with their configurations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16570) Difficulties to select correct metric with long name in dropdown of Flink UI task menu
Fabian Paul created FLINK-16570: --- Summary: Difficulties to select correct metric with long name in dropdown of Flink UI task menu Key: FLINK-16570 URL: https://issues.apache.org/jira/browse/FLINK-16570 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Fabian Paul Attachments: metrics_dropdown.png As seen in the attached image it is currently difficult to select the correct metrics when the metric name exceeds the length of the dropdown because the full name cannot be seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16553) KafkaFetcher topic/partition metrics
Fabian Paul created FLINK-16553: --- Summary: KafkaFetcher topic/partition metrics Key: FLINK-16553 URL: https://issues.apache.org/jira/browse/FLINK-16553 Project: Flink Issue Type: Improvement Components: Connectors / Kafka, Runtime / Metrics Reporter: Fabian Paul When using the Kafka universal connector, currently not all KafkaFetcher metrics ([link| https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java] which are exposed through the KafkaConsumer are accessible within the Flink metrics system. Especially, all metrics which are related to topics and partitions are not available. The KafkaConsumer internally only registers those metrics after it has fetched some records. Unfortunately, at the moment Flink only checks the available metrics right after the initialization of the KafkaConsumer when no records are polled, yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16525) TwoPhaseCommitSinkFunction subtask logs misleading name
Fabian Paul created FLINK-16525: --- Summary: TwoPhaseCommitSinkFunction subtask logs misleading name Key: FLINK-16525 URL: https://issues.apache.org/jira/browse/FLINK-16525 Project: Flink Issue Type: Bug Components: Runtime / Task Reporter: Fabian Paul The current name() function in TwoPhaseCommitSinkFunction tries to describe the currently running subtask with its class name, the index of the subtask and the number of parallel subtasks. Since the starting index of the subtask is 0, and the starting number for the parallelism is 1, it could lead to the following log message. {code:java} 15:59:41,448 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 0/1 - checkpoint 1 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1583852371370} from checkpoint 1 {code} Although only one subtask is running it describes the subtask as 0/1 which might indicate more than one subtask. I would suggest incrementing the first number after the class name by 1 to better indicate how many subtasks are running. -- This message was sent by Atlassian Jira (v8.3.4#803005)