[jira] [Created] (FLINK-35419) scan.bounded.latest-offset makes queries never finish if the latest message is a EndTxn Kafka marker

2024-05-22 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-35419:
---

 Summary: scan.bounded.latest-offset makes queries never finish if 
the latest message is a EndTxn Kafka marker
 Key: FLINK-35419
 URL: https://issues.apache.org/jira/browse/FLINK-35419
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.19.0, 1.17.0, 1.16.0, 1.8.0
Reporter: Fabian Paul


When running the kafka connector in bounded mode, the stop condition can be 
defined as the latest offset when the job starts.

 

Unfortunately, Kafka's latest offset calculation also includes special marker 
records, such as transaction markers, in the overall count.

 

When Flink waits for a job to finish, it compares the number of records read 
until the point with the original latest offset [1]. Since the consumer will 
never see the special marker records, the latest offset is never reached, and 
the job gets stuck. 

 

To reproduce the issue, you can write into a Kafka topic and make sure that the 
latest record is a transaction end event. Afterwards you can start a Flink job 
configured with `scan.bounded.latest-offset` pointing to that topic.

 

[1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2022-11-29 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-30238:
---

 Summary: Unified Sink committer does not clean up state on final 
savepoint
 Key: FLINK-30238
 URL: https://issues.apache.org/jira/browse/FLINK-30238
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.3, 1.17.0, 1.16.1
Reporter: Fabian Paul


During stop-with-savepoint the committer only commits the pending committables 
on notifyCheckpointComplete.

This has several downsides.
 * Last committableSummary has checkpoint id LONG.MAX and is never cleared from 
the state leading to that stop-with-savepoint does not work when the pipeline 
recovers from a savepoint 
 * While the committables are committed during stop-with-savepoint they are not 
forwarded to post-commit topology, potentially losing data and preventing to 
close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29583) Ensure correct subtaskId and checkpointId is set during committer state migration

2022-10-11 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-29583:
---

 Summary: Ensure correct subtaskId and checkpointId is set during 
committer state migration
 Key: FLINK-29583
 URL: https://issues.apache.org/jira/browse/FLINK-29583
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Fabian Paul


We already discovered two problems during recovery of committers when a post 
commit topology is used

 
 * https://issues.apache.org/jira/browse/FLINK-29509
 * https://issues.apache.org/jira/browse/FLINK-29512

Both problems also apply when recovering Flink 1.14 unified sinks committer 
state and migrate it to the extended unified model.

As part of this ticket we should fix both issues for the migration and also 
increase the test coverage for the migration cases i.e. add test cases in 
CommitterOperatorTest and CommittableCollectorSerializerTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29512) Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery

2022-10-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-29512:
---

 Summary: Align SubtaskCommittableManager checkpointId with 
CheckpointCommittableManagerImpl checkpointId during recovery
 Key: FLINK-29512
 URL: https://issues.apache.org/jira/browse/FLINK-29512
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.2, 1.17.0, 1.16.1
Reporter: Fabian Paul


Similar to the issue described in 
https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of 
committables, the subtaskCommittables checkpointId is set to always 1 
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193]
 while the holding CheckpointCommittableManager is initialized with the 
checkpointId that is written into state 
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155
 
.|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.]

 

This leads to that during a recovery, the post-commit topology will receive a 
committable summary with the recovered checkpoint id and multiple 
`CommittableWithLinage`s with the reset checkpointId causing orphaned 
`CommittableWithLinages` without a `CommittableSummary` failing the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29509) Set correct subtaskId during recovery of committables

2022-10-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-29509:
---

 Summary: Set correct subtaskId during recovery of committables
 Key: FLINK-29509
 URL: https://issues.apache.org/jira/browse/FLINK-29509
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.2, 1.17.0, 1.16.1
Reporter: Fabian Paul


When we recover the `CheckpointCommittableManager` we ignore the subtaskId it 
is recovered on. 
[https://github.com/apache/flink/blob/d191bda7e63a2c12416cba56090e5cd75426079b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L58]

This becomes a problem when a sink uses a post-commit topology because multiple 
committer operators might forward committable summaries coming from the same 
subtaskId.

 

It should be possible to use the subtaskId already present in the 
`CommittableCollector` when creating the `CheckpointCommittableManager`s.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27493) Forward all numeric Kafka metrics to Flink's metrics system

2022-05-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27493:
---

 Summary: Forward all numeric Kafka metrics to Flink's metrics 
system
 Key: FLINK-27493
 URL: https://issues.apache.org/jira/browse/FLINK-27493
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Affects Versions: 1.16.0
Reporter: Fabian Paul


With the upgrade of the Kafka version to 2.8, it is now possible to access more 
metrics from the KafkaConsumer/KafkaProducer. So far we only forward metrics 
that are of type Double but ignore other numeric values. It might be worthwhile 
to forward all numerics metrics.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27486) Reduce ArchUnit violations in connector base module

2022-05-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27486:
---

 Summary: Reduce ArchUnit violations in connector base module
 Key: FLINK-27486
 URL: https://issues.apache.org/jira/browse/FLINK-27486
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.16.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27484) Reduce ArchUnit violations in the project

2022-05-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27484:
---

 Summary: Reduce ArchUnit violations in the project
 Key: FLINK-27484
 URL: https://issues.apache.org/jira/browse/FLINK-27484
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, Connectors / Common, Runtime / Configuration
Reporter: Fabian Paul


When ArchUnit was introduced we deliberately ignored the existing violations. 
This is the umbrella ticket to hold the efforts to reduce the exposure.

 

In the long run, this gives our users clarity about using or not using a 
certain part of the codebase.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27480) KafkaSources sharing the groupId might lead to InstanceAlreadyExistException warning

2022-05-03 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27480:
---

 Summary: KafkaSources sharing the groupId might lead to 
InstanceAlreadyExistException warning
 Key: FLINK-27480
 URL: https://issues.apache.org/jira/browse/FLINK-27480
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.4, 1.15.0, 1.16.0
Reporter: Fabian Paul


More and more frequently, users ran into an issue by not correctly configuring 
the KafkaSource 
([https://stackoverflow.com/questions/72026997/flink-instancealreadyexistsexception-while-migrating-to-the-kafkasource)]
 and setting non-distinguishable groupIds for the source. Internally the used 
KafkaConsumer tries to register with the metric system and incorporates the 
groupId as part of the metric, leading to name collision.

We should update the documentation to explain the situation properly.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-26701) Relocation of connector-base might break user jars due to changed imports

2022-03-17 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26701:
---

 Summary: Relocation of connector-base might break user jars due to 
changed imports
 Key: FLINK-26701
 URL: https://issues.apache.org/jira/browse/FLINK-26701
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


With the introduction of FLINK-25927, every connector now relocates 
connector-base to better support connectors compatibility with different Flink 
versions. Unfortunately, not all classes in connector-base are only used by 
connector but some are supposed to be used inside the user jar directly i.e. 
DeliveryGuarantee, HybridSource...

Since the connector now relocates the module the existing imports are broken.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26633) Elasticsearch connector does not report recordsSend metric

2022-03-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26633:
---

 Summary: Elasticsearch connector does not report recordsSend metric
 Key: FLINK-26633
 URL: https://issues.apache.org/jira/browse/FLINK-26633
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


As part of a unified sink, it is recommended to report a recordSend metric to 
let users track the number of outgoing records from Flink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26613) Stateful unified Sink V2 upgrades only work when operator uids are given

2022-03-11 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26613:
---

 Summary: Stateful unified Sink V2 upgrades only work when operator 
uids are given
 Key: FLINK-26613
 URL: https://issues.apache.org/jira/browse/FLINK-26613
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


As part of documentation [1]we guarantee that in case a stateful migration 
fails and no uids are used that it is possible to bind the recovered state via 
the uidHash. 

This ticket should add a uidHash setter for the operators that are affected by 
the migration (writer, committer, global committer).

 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#preconditions



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26516) Sink V2 is not state compatible with Sink V1

2022-03-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26516:
---

 Summary: Sink V2 is not state compatible with Sink V1
 Key: FLINK-26516
 URL: https://issues.apache.org/jira/browse/FLINK-26516
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


While working https://issues.apache.org/jira/browse/FLINK-26173 we decided to 
split off the state compatibility issue to lower the priority of the behavioral 
issue since it does not affect many users.

 

This ticket is solely responsible to fix the state incompatibility between Sink 
V1 and Sink V2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26445) Unified Sink cannot restore committer state

2022-03-01 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26445:
---

 Summary: Unified Sink cannot restore committer state
 Key: FLINK-26445
 URL: https://issues.apache.org/jira/browse/FLINK-26445
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.14.3
Reporter: Fabian Paul


In Flink 1.14 the writer and committer operator became one operator in 
streaming mode. Unfortunately, the state is still associated with the previous 
physical operator so during restore the job fails due non-restored state of the 
committer operator.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26416) Release Testing:

2022-02-28 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26416:
---

 Summary: Release Testing: 
 Key: FLINK-26416
 URL: https://issues.apache.org/jira/browse/FLINK-26416
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


With the introduction of Sink V2, the operator model of the sink changed 
slightly therefore it makes sense to test different upgrade/sanity scenarios.

 

You can take any of the existing Sinks in the project. I would recommend the 
FileSink.

 
 # Run a job with Flink 1.14 and take a savepoint and try to restore and resume 
with 1.15


 # Run a job with Flink 1.15 and take a savepoint and try to restore and resume 
with 1.15


 # Run a bounded job with Flink 1.15

 

In all cases, please verify that all records have been written at the end of 
the scenario and there are no duplicates.

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26358) Operator maxParallelism is lost during chaining

2022-02-24 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26358:
---

 Summary: Operator maxParallelism is lost during chaining
 Key: FLINK-26358
 URL: https://issues.apache.org/jira/browse/FLINK-26358
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.3, 1.15.0
Reporter: Fabian Paul


During the generation of the JobGraph from the StreamGraph the maxParallelism 
of the chained operators is lost so the maxParallelism written to a snapshot 
might not reflect the real maxParallelism of the operator.

If a user now unchains the operator it is not possible to restore a snapshot 
anymore because the maxParallelism of the operator and the snapshot do not 
match anymore.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26304) GlobalCommitter can receive failed committables

2022-02-22 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26304:
---

 Summary: GlobalCommitter can receive failed committables
 Key: FLINK-26304
 URL: https://issues.apache.org/jira/browse/FLINK-26304
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.14.3
Reporter: Fabian Paul
 Fix For: 1.14.4


After the introduction of retrying failed committables in the committer, it is 
important to only forward committables that have been committed to the 
GlobalCommitter. Currently, this is ignored and the Committer will forward all 
committable nevertheless if they succeeded. 

 

[1] 
https://github.com/apache/flink/blob/0a76d632f33d9a69df87457a63043bd7f609ed40/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java#L144



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26254) KafkaSink might violate order of sequence numbers and risk exactly-once processing

2022-02-18 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26254:
---

 Summary: KafkaSink might violate order of sequence numbers and 
risk exactly-once processing
 Key: FLINK-26254
 URL: https://issues.apache.org/jira/browse/FLINK-26254
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.3, 1.15.0
Reporter: Fabian Paul


When running the KafkaSink in exactly-once mode with a very low checkpoint 
interval users are seeing `OutOfOrderSequenceException`.

It could be caused by the fact that the connector has a pool of KafkaProducers 
and the sequence numbers are not shared/reset if a new KafkaProducer tries to 
write to a partition while the previous KafkaProducer is still occupied for 
committing.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26119) AsyncSinkWriterStateSerializer needs to be PublicEvolving

2022-02-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26119:
---

 Summary: AsyncSinkWriterStateSerializer needs to be PublicEvolving
 Key: FLINK-26119
 URL: https://issues.apache.org/jira/browse/FLINK-26119
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul


Developers who want to create an AsyncSink are supposed to use the 
AsyncSinkWriterStateSerializer to create the writer serializer. In hindsight of 
the upcoming connector externalization, this class needs to be made 
PublicEvolving to guarantee some stability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26118) AsyncSinks do not support downscaling with state

2022-02-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-26118:
---

 Summary: AsyncSinks do not support downscaling with state
 Key: FLINK-26118
 URL: https://issues.apache.org/jira/browse/FLINK-26118
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul


Currently, the AsyncSinkWriter assumes to always be restored exactly from only 
one previous writer state but in case of a downscaling after a snapshot a 
writer will receive multiple states.

 

https://github.com/apache/flink/blob/c60eb0c3b4bf7dc045dd7a1da2080c7befebb8dc/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L438



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25921) Support different input parallelism for preCommit topology

2022-02-02 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25921:
---

 Summary: Support different input parallelism for preCommit topology
 Key: FLINK-25921
 URL: https://issues.apache.org/jira/browse/FLINK-25921
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Affects Versions: 1.15.0, 1.16.0
Reporter: Fabian Paul


Currently, we assume that the pre-commit topology has the same parallelism as 
the operator before when inserting the failover region. To support a different 
parallelism we might need to insert a different identity map to customize the 
mapping.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25920) Allow receiving updates of CommittableSummary

2022-02-02 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25920:
---

 Summary: Allow receiving updates of CommittableSummary
 Key: FLINK-25920
 URL: https://issues.apache.org/jira/browse/FLINK-25920
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.15.0, 1.16.0
Reporter: Fabian Paul


In the case of unaligned checkpoints, it might happen that the checkpoint 
barrier overtakes the records and an empty committable summary is emitted that 
needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25919) Sink V2 improvements and followups

2022-02-02 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25919:
---

 Summary: Sink V2 improvements and followups
 Key: FLINK-25919
 URL: https://issues.apache.org/jira/browse/FLINK-25919
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.16.0
Reporter: Fabian Paul


This is an umbrella ticket for know limitations and improvements we still want 
to do for the Sink V2 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25857) Add committer metrics to track the status of committables

2022-01-27 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25857:
---

 Summary: Add committer metrics to track the status of committables
 Key: FLINK-25857
 URL: https://issues.apache.org/jira/browse/FLINK-25857
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Fabian Paul


With Sink V2 we can now track the progress of a committable during committing 
and show metrics about the committing status. (i.e. failed, retried, succeeded).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25726) Implement GlobalCommitter as custom post commit topology

2022-01-20 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25726:
---

 Summary: Implement GlobalCommitter as custom post commit topology
 Key: FLINK-25726
 URL: https://issues.apache.org/jira/browse/FLINK-25726
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul


The global committer was rarely used before in Sink V1 so we decided to make it 
an extension that is only exposed to a specific set of users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25608) Mark metrics as Public(Evolving)

2022-01-11 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25608:
---

 Summary: Mark metrics as Public(Evolving)
 Key: FLINK-25608
 URL: https://issues.apache.org/jira/browse/FLINK-25608
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.15.0
Reporter: Fabian Paul


With the introduction of architectural tests and the exposure of the metrics 
API to user-facing components the metrics classes also need proper annotations.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25591) Use FileSource for StreamExecutionEnvironment.readFiles

2022-01-10 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25591:
---

 Summary: Use FileSource for StreamExecutionEnvironment.readFiles
 Key: FLINK-25591
 URL: https://issues.apache.org/jira/browse/FLINK-25591
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream, Connectors / FileSystem
Reporter: Fabian Paul


Evaluate whether it is possible to replace the internals of 
StreamExecutionEnvironment.readFiles with the FileSource. The FileSource can 
already read the state of the previous version but some functionality might be 
missing i.e. nested file enumeration, filter files.

If the replacement is not possible we should add a new method based on the 
FileSource and deprecated the old one.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25578) Graduate Sink V1 interfaces to PublicEvolving and deprecate them

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25578:
---

 Summary: Graduate Sink V1 interfaces to PublicEvolving and 
deprecate them
 Key: FLINK-25578
 URL: https://issues.apache.org/jira/browse/FLINK-25578
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Affects Versions: 1.15.0
Reporter: Fabian Paul


In the discussion of FLIP-191 we decided to not break existing sinks and give 
them time to migrate to Sink V2.

We do not plan to delete Sink V1 in near future and still support but we won't 
develop new features.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25575) Implement StreamGraph translation for Sink V2 interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25575:
---

 Summary: Implement StreamGraph translation for Sink V2 interfaces
 Key: FLINK-25575
 URL: https://issues.apache.org/jira/browse/FLINK-25575
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core, API / DataStream
Affects Versions: 1.15.0
Reporter: Fabian Paul


This task covers the translation from the user-defined interfaces and the 
extension topologies to the actual operators.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25574) Update Async Sink to use decomposed interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25574:
---

 Summary: Update Async Sink to use decomposed interfaces
 Key: FLINK-25574
 URL: https://issues.apache.org/jira/browse/FLINK-25574
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25573) Update Kafka Sink to use decomposed interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25573:
---

 Summary: Update Kafka Sink to use decomposed interfaces
 Key: FLINK-25573
 URL: https://issues.apache.org/jira/browse/FLINK-25573
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25572) Update File Sink to use decomposed interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25572:
---

 Summary: Update File Sink to use decomposed interfaces
 Key: FLINK-25572
 URL: https://issues.apache.org/jira/browse/FLINK-25572
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25571) Update Elasticsearch Sink to use decomposed interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25571:
---

 Summary: Update Elasticsearch Sink to use decomposed interfaces
 Key: FLINK-25571
 URL: https://issues.apache.org/jira/browse/FLINK-25571
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / ElasticSearch
Affects Versions: 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25570) Introduce Sink V2 extension APIs

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25570:
---

 Summary: Introduce Sink V2 extension APIs
 Key: FLINK-25570
 URL: https://issues.apache.org/jira/browse/FLINK-25570
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core, Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


This task introduces the interfaces needed to implement the custom operations 
before/after the writer and committer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25569) Introduce decomposed Sink V2 interfaces

2022-01-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25569:
---

 Summary: Introduce decomposed Sink V2 interfaces
 Key: FLINK-25569
 URL: https://issues.apache.org/jira/browse/FLINK-25569
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Fabian Paul
 Fix For: 1.15.0


This task introduces the interfaces described in [1] without the datastream 
extension hooks.

 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25555) FLIP-191: Extend unified Sink interface to support small file compaction

2022-01-06 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-2:
---

 Summary: FLIP-191: Extend unified Sink interface to support small 
file compaction
 Key: FLINK-2
 URL: https://issues.apache.org/jira/browse/FLINK-2
 Project: Flink
  Issue Type: New Feature
  Components: API / Core, API / DataStream, Connectors / Common
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25222) Remove NetworkFailureProxy used for Kafka connector tests

2021-12-08 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25222:
---

 Summary: Remove NetworkFailureProxy used for Kafka connector tests
 Key: FLINK-25222
 URL: https://issues.apache.org/jira/browse/FLINK-25222
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Test Infrastructure
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


Recently the number of Kafka connector tests either hitting a timeout due to 
blocked networking or corrupted network responses increased significantly. 

We think it is also by our custom network failure implementation since all the 
tests are for the legacy FlinkKafkaProducer or FlinkKafkaConsumer we want 
safely remove them because we will not add more features to this connector, to 
increase the overall stability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25206) Add config option to disable configurations in the user program

2021-12-07 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25206:
---

 Summary: Add config option to disable configurations in the user 
program
 Key: FLINK-25206
 URL: https://issues.apache.org/jira/browse/FLINK-25206
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Fabian Paul


If you run a large number of Flink jobs usually there is a governing entity 
controlling and ensuring the execution of all clusters.

To ease the control over configurations we want to enable setting a 
configuration that disallows configurations in the user program. 

By default, this is turned off so the normal execution will not be affected.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25134) Unused RetryRule in KafkaConsumerTestBase swallows retries

2021-12-01 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25134:
---

 Summary: Unused RetryRule in KafkaConsumerTestBase swallows retries
 Key: FLINK-25134
 URL: https://issues.apache.org/jira/browse/FLINK-25134
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.3, 1.14.0, 1.15.0
Reporter: Fabian Paul


After merging https://issues.apache.org/jira/browse/FLINK-15493 a few tests are 
still not retried because the KafkaConsumerTestBase overwrites the RetryRule 
unnecessarily.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25041) E2E tar ball cache fails without error message if target directory not specified

2021-11-24 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-25041:
---

 Summary: E2E tar ball cache fails without error message if target 
directory not specified
 Key: FLINK-25041
 URL: https://issues.apache.org/jira/browse/FLINK-25041
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 1.13.3, 1.14.0, 1.15.0
Reporter: Fabian Paul


We want to verify if the variable has been set.
{code:java}
if [ -z "$E2E_TARBALL_CACHE" ] ; then
echo "You have to export the E2E Tarball Cache as E2E_TARBALL_CACHE"
exit 1
fi {code}
but the shown code immediately fails with an `unbound variable` error if the 
variable is not set and it does not evaluate the branch.

 

We should change it to something like this
{code:java}
if [ "${E2E_TARBALL_CACHE+x}" == x ] ; then
echo "You have to export the E2E Tarball Cache as E2E_TARBALL_CACHE" 
exit 1
fi  {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24858) TypeSerializer version mismatch during eagerly restore

2021-11-10 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24858:
---

 Summary: TypeSerializer version mismatch during eagerly restore
 Key: FLINK-24858
 URL: https://issues.apache.org/jira/browse/FLINK-24858
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.13.3, 1.14.0, 1.15.0
Reporter: Fabian Paul


Currently, some of our TypeSerializer snapshots assume information about the 
binary layout of the actual data rather than only holding information about the 
TypeSerialzer.

Multiple users ran into this problem 
i.e.[https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx|https://lists.apache.org/thread/4q5q7wp0br96op6p7f695q2l8lz4wfzx]
{quote}This manifest itself when state is restored egarly (for example an 
operator state) but, for example a user doesn't register the state on their 
intializeState/open,* and then a checkpoint happens.
The result is that we will have elements serialized according to an old binary 
layout, but our serializer snapshot declares a new version which indicates that 
the elements are written with a new binary layout.
The next restore will fail.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24773) KafkaCommitter should fail on unknown Exception

2021-11-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24773:
---

 Summary: KafkaCommitter should fail on unknown Exception
 Key: FLINK-24773
 URL: https://issues.apache.org/jira/browse/FLINK-24773
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


Some of the exceptions during the committing phase are tolerable or even 
retriable but generally, if an unknown exception is raised we should fail the 
job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24765) Upgrade Kafka dependency

2021-11-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24765:
---

 Summary: Upgrade Kafka dependency
 Key: FLINK-24765
 URL: https://issues.apache.org/jira/browse/FLINK-24765
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Fabian Paul


We rely on a very old Kafka version 2.4.1 which was the last version supporting 
scala 2.11. Since we dropped Scala 2.11 we can now update to a more recent one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24647) ClusterUncaughtExceptionHandler does not log the exception

2021-10-26 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24647:
---

 Summary: ClusterUncaughtExceptionHandler does not log the exception
 Key: FLINK-24647
 URL: https://issues.apache.org/jira/browse/FLINK-24647
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


If an uncaught exception occurs and the uncaught exception handler is 
configured to log it swallows the exception stack trace.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24612) Kafka test container creates a large amount of logs

2021-10-21 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24612:
---

 Summary: Kafka test container creates a large amount of logs
 Key: FLINK-24612
 URL: https://issues.apache.org/jira/browse/FLINK-24612
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


When we use a testcontainer setup we try to forward all container STDOUT logs 
to the surrounding test logger. Unfortunately, Kafka loggers are by default 
writing a large number of logs because some of the internal loggers are 
defaulting to TRACE logging.

A good example is this test job 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=25084&view=logs&j=32a18cd8-d404-5807-996d-abcee436b891
where one of the test was stuck and the generated artifact is ~25GB. This makes 
debugging very hard because the file is hard to parse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24608) Sinks built with the unified sink framework do not receive timestamps when used in Table API

2021-10-20 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24608:
---

 Summary: Sinks built with the unified sink framework do not 
receive timestamps when used in Table API
 Key: FLINK-24608
 URL: https://issues.apache.org/jira/browse/FLINK-24608
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common, Table SQL / Planner
Affects Versions: 1.13.3, 1.14.0, 1.15.0
Reporter: Fabian Paul


All sinks built with the unified sink framework extract the timestamp from the 
internal {{StreamRecord}}. The Table API does not facilitate the timestamp 
field in the {{StreamRecord}}  but extracts the timestamp from the actual data. 

We either have to use a dedicated operator before all the sinks to simulate the 
behavior or allow a customizable timestamp extraction during the sink 
translation.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24530) GlobalCommitter might not commit all records on drain

2021-10-13 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24530:
---

 Summary: GlobalCommitter might not commit all records on drain
 Key: FLINK-24530
 URL: https://issues.apache.org/jira/browse/FLINK-24530
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.13.2, 1.14.0, 1.15.0
Reporter: Fabian Paul


If a pipeline has a unified Sink that implements a {{Committer}} and 
{{GlobalCommitter}} committables are forwarded from the {{Committer}} on either 
receiving {{notifyCheckpointCompleted}} or {{endInput}}. 

Since both methods are triggered by RPC calls we currently do not guarantee an 
order when either method is called on the {{Committer}} or {{GlobalCommitter}}. 
This can lead to that the {{GlobalCommitter}} receives the RPC call at first 
and tries to commit although the {{Committer}} has nothing forwarded yet. For 
the normal streaming execution, it means that the visibility of the final 
commit could be delayed by one checkpoint.

Unfortunately, if the pipeline is stopped with a savepoint and drained the 
{{endInput}} might be called at the {{GlobalCommitter}} before the 
{{Committer}} leading to potential data loss.

With FLIP-147 (checkpoints after task finish) fully implemented the problem is 
mitigated because it guarantees the order of the final 
{{notifyCheckpointCompleted}} call although it still suffers from the 
potentially delayed visibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24488) KafkaRecordSerializationSchemaBuilder does not forward timestamp

2021-10-08 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24488:
---

 Summary: KafkaRecordSerializationSchemaBuilder does not forward 
timestamp
 Key: FLINK-24488
 URL: https://issues.apache.org/jira/browse/FLINK-24488
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


When building a KafkaRecordSerializationSchema with its builder the extracted 
event time timestamp is currently not set at the resulting ProducerRecord.

https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java#L328



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24397) Reduce TableSchema usage in Table API connectors

2021-09-28 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24397:
---

 Summary: Reduce TableSchema usage in Table API connectors
 Key: FLINK-24397
 URL: https://issues.apache.org/jira/browse/FLINK-24397
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Table SQL / API
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


The TableSchema class is now deprecated for a while but a lot of connectors 
still depend on it use (DynamicTableSources and DynamicTableSinks). We should 
reduce the usage of the TableSchema to provide a better example for external 
contributions and allow us eventually to drop the TableSchema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24382) RecordsOut metric for sinks is inaccurate

2021-09-26 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24382:
---

 Summary: RecordsOut metric for sinks is inaccurate
 Key: FLINK-24382
 URL: https://issues.apache.org/jira/browse/FLINK-24382
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Runtime / Metrics
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


Currently, the metric is computed on the operator level and it is assumed that 
every record flowing into the sink also generates one outgoing record.

This is often not reasonable because the sinks can transform incoming records 
into multiple outgoing records, thus the metric should be implemented by the 
sink implementors and not be reasoned by the framework.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24372) Deprecate Elasticsearch Sinkfunctions

2021-09-24 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24372:
---

 Summary: Deprecate Elasticsearch Sinkfunctions
 Key: FLINK-24372
 URL: https://issues.apache.org/jira/browse/FLINK-24372
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / ElasticSearch
Affects Versions: 1.15.0
Reporter: Fabian Paul


Once all other tickets of FLINK-24323 are resolved we can mark the 
Elasticsearch sinks implementing sinkfunction as deprecated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24371) Support SinkWriter preCommit without the need of a committer

2021-09-24 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24371:
---

 Summary: Support SinkWriter preCommit without the need of a 
committer
 Key: FLINK-24371
 URL: https://issues.apache.org/jira/browse/FLINK-24371
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Connectors / Common
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


For some sinks i.e. elasticsearch we only want to listen for the snapshot 
barrier once retrieved the sink flushes the buffered records.
These sinks do not snapshot any state thus do not implement the 
`snapshotState()` method. 

We already have a NoopCommitHandler which swallows the passed committables but 
it should at least call `preCommit()` on the SinkWriter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24368) Collect logs of Flink container in e2e source test suite

2021-09-24 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24368:
---

 Summary: Collect logs of Flink container in e2e source test suite
 Key: FLINK-24368
 URL: https://issues.apache.org/jira/browse/FLINK-24368
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Affects Versions: 1.14.0
Reporter: Fabian Paul


Currently, the logs from the Flink containers used for testing can only be 
accessed through manual looking into the container via docker commands. It 
makes debugging source e2e tests failures on the CI very difficult.

I see two options
# Stream the logs while the test is running to the stdout
# After a test has failed copy the logs to some log directory (that has the 
disadvantage that in case the container is stuck and the timeout daemon kills 
the test we do not get a nice thread dump because the container is not part of 
the test jvm)







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24347) KafkaSource cannot checkpoint if the parallelism is higher than the partition number

2021-09-21 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24347:
---

 Summary: KafkaSource cannot checkpoint if the parallelism is 
higher than the partition number
 Key: FLINK-24347
 URL: https://issues.apache.org/jira/browse/FLINK-24347
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.13.2, 1.14.0
Reporter: Fabian Paul


The KafkaSourceEnumerator signals all the readers if there are no more splits 
available and the readers shut down. In case the parallelism is higher than the 
partitions of the consumed topic there are not enough partitions to distribute 
and reader subtasks go immediately into FINISHED state.
Currently, it is not possible to checkpoint if parts of the job are finished. 
(this is lifted once FLIP-147 is by default enabled Flink 1.15+)

We should only signal to the readers to go into finished if the job is in 
bounded execution mode and keep them idling otherwise.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24308) Translate KafkaSink docs to chinese

2021-09-16 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24308:
---

 Summary: Translate KafkaSink docs to chinese
 Key: FLINK-24308
 URL: https://issues.apache.org/jira/browse/FLINK-24308
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.14.0
Reporter: Fabian Paul


With https://issues.apache.org/jira/browse/FLINK-23664 only the English 
documentation was updated. We also have to update the Chinese docs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24292) Update Flink's Kafka examples to use KafkaSink

2021-09-15 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24292:
---

 Summary: Update Flink's Kafka examples to use KafkaSink
 Key: FLINK-24292
 URL: https://issues.apache.org/jira/browse/FLINK-24292
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24282) KafkaRecordSerializationSchema TopicSelector is not serializable

2021-09-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24282:
---

 Summary: KafkaRecordSerializationSchema TopicSelector is not 
serializable
 Key: FLINK-24282
 URL: https://issues.apache.org/jira/browse/FLINK-24282
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Fabian Paul


To dynamically calculate the outgoing topic we allow passing a lambda. 
Unfortunately, it is currently not marked as serializable hence the following 
code fails in during closure cleaning when used within a job.
 
{code:java}
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(serSchema)
.setPartitioner(partitioner)
.build())
{code}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24281) Migrate all existing tests to new Kafka Sink

2021-09-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24281:
---

 Summary: Migrate all existing tests to new Kafka Sink
 Key: FLINK-24281
 URL: https://issues.apache.org/jira/browse/FLINK-24281
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


The FlinkKafkaProducer is deprecated since 1.14 but a lot of existing tests are 
still using.

We should replace it with the KafkaSink because it completely subsumes it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24079) BufferedUpsertSinkFunction can loose records during failover

2021-08-31 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24079:
---

 Summary: BufferedUpsertSinkFunction can loose records during 
failover
 Key: FLINK-24079
 URL: https://issues.apache.org/jira/browse/FLINK-24079
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Table SQL / Ecosystem
Affects Versions: 1.13.2
Reporter: Fabian Paul


The internally used buffer is not snapshotted on checkpoint which can lead to 
loosing on failure. We need to snapshot the buffer similarly to FLINK-23875.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24055) Deprecate FlinkKafkaConsumer

2021-08-30 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24055:
---

 Summary: Deprecate FlinkKafkaConsumer
 Key: FLINK-24055
 URL: https://issues.apache.org/jira/browse/FLINK-24055
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


With the introduction of the KafkaSource 
https://issues.apache.org/jira/browse/FLINK-18323 we should deprecate the 
FlinkKafkaConsumer to hint users to start the migration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24051) Make consumer.group-id optional for KafkaSource

2021-08-30 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24051:
---

 Summary: Make consumer.group-id optional for KafkaSource
 Key: FLINK-24051
 URL: https://issues.apache.org/jira/browse/FLINK-24051
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


For most of the users it is not necessary to generate a group-id and the source 
itself can provide a meaningful group-id during startup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24032) StreamSink does not receive periodic watermarks

2021-08-27 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24032:
---

 Summary: StreamSink does not receive periodic watermarks
 Key: FLINK-24032
 URL: https://issues.apache.org/jira/browse/FLINK-24032
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: Fabian Paul


In the following scenario, the sink will never receive watermarks 
  
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.addSink(...);
{code}
I also noticed that when changing the code to the following the watermarks flow 
to the sink
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.process(new ProcessFunction() {...})
.addSink(...);
{code}
An example test case is accessible here 
[https://github.com/fapaul/flink/blob/9b749ac80cd128a7f288da45db313bafa39d8008/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java#L68]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24006) MailboxExecutorImplTest#testIsIdle does not test the correct behaviour

2021-08-26 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24006:
---

 Summary: MailboxExecutorImplTest#testIsIdle does not test the 
correct behaviour
 Key: FLINK-24006
 URL: https://issues.apache.org/jira/browse/FLINK-24006
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task, Tests
Affects Versions: 1.13.2, 1.12.5, 1.14.0
Reporter: Fabian Paul


The test was introduced to ensure that the mailbox idleness is still counting 
new messages although the mailbox loop might have been stopped.

 

Unfortunately, the test does not stop the mailbox processor currently which 
leads to than the test even passes without the actual code changes of 
https://issues.apache.org/jira/browse/FLINK-19109



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23875) ReducingUpsertSink can loose record during failover

2021-08-19 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23875:
---

 Summary:  ReducingUpsertSink can loose record during failover
 Key: FLINK-23875
 URL: https://issues.apache.org/jira/browse/FLINK-23875
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Table SQL / API
Affects Versions: 1.14.0
Reporter: Fabian Paul


When trying to rework the Table API Kafka connector to make it compatible with 
the new KafkaSink I noticed that currently the buffer which is used to reduce 
the update-before and update-after calls is not snapshotted which can result in 
data loss if the job fails while the buffer is not empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23838) Add FLIP-33 metrics to new KafkaSink

2021-08-17 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23838:
---

 Summary: Add FLIP-33 metrics to new KafkaSink
 Key: FLINK-23838
 URL: https://issues.apache.org/jira/browse/FLINK-23838
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Reporter: Fabian Paul
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23814) Test FLIP-143 KafkaSink

2021-08-16 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23814:
---

 Summary: Test FLIP-143 KafkaSink
 Key: FLINK-23814
 URL: https://issues.apache.org/jira/browse/FLINK-23814
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Reporter: Fabian Paul
 Fix For: 1.14.0


The following scenarios are worthwhile to test
 * Start simple job with None/At-least once delivery guarantee and write 
records to kafka topic
 * Start simple job with exactly-once delivery guarantee and write records to 
kafka topic. The records should only be visible with a `read-committed` consumer
 * Stop a job with exactly-once delivery guarantee and restart it with 
different parallelism (scale-down, scale-up)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23735) Migrate BufferedUpsertSinkFunction to FLIP-143

2021-08-12 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23735:
---

 Summary: Migrate BufferedUpsertSinkFunction to FLIP-143
 Key: FLINK-23735
 URL: https://issues.apache.org/jira/browse/FLINK-23735
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul


The BufferedUpsertSinkFunction is still using the old sink interfaces and 
relies on the old Kafka DataStream connector FlinkKafkaProducer.

We need to migrate it to the new Sink API to also leverage the new KafkaSink 
connector and finally deprecate the FlinkKafkaProducer and all its belongings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23710) Move sink to org.apache.kafka.conntor.kafka.sink package

2021-08-10 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23710:
---

 Summary: Move sink to org.apache.kafka.conntor.kafka.sink package
 Key: FLINK-23710
 URL: https://issues.apache.org/jira/browse/FLINK-23710
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23664) Write documentation for new KafkaSink

2021-08-06 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23664:
---

 Summary: Write documentation for new KafkaSink
 Key: FLINK-23664
 URL: https://issues.apache.org/jira/browse/FLINK-23664
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23640) Create a KafkaRecordSerializationSchemas valueOnly helper

2021-08-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23640:
---

 Summary: Create a KafkaRecordSerializationSchemas valueOnly helper
 Key: FLINK-23640
 URL: https://issues.apache.org/jira/browse/FLINK-23640
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul


Commonly users only want to serialize the value of a Kafka record if they are 
not using a key schema. For these users, we can provide an easier entry point 
comparable to \{{ KafkaValueOnlyDeserializerWrapper }}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23639) Migrate Table API to new KafkaSink

2021-08-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23639:
---

 Summary: Migrate Table API to new KafkaSink
 Key: FLINK-23639
 URL: https://issues.apache.org/jira/browse/FLINK-23639
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka, Table SQL / API
Reporter: Fabian Paul


With the KafkaSink ported to FLIP-143 we should also adapt the Table API to 
leverage the new KafkaSink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23248) SinkWriter is not close when failing

2021-07-05 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23248:
---

 Summary: SinkWriter is not close when failing
 Key: FLINK-23248
 URL: https://issues.apache.org/jira/browse/FLINK-23248
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.12.4, 1.13.1, 1.14.0
Reporter: Fabian Paul


Currently the SinkWriter is only closed when the operator finishes in 
`AbstractSinkWriterOperator#close()` but we also must close the SinkWrite on 
`AbstractSinkWriterOperator#dispose()` to release possible acquired resources 
when failing

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23124) Implement exactly-once Kafka Sink

2021-06-23 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23124:
---

 Summary: Implement exactly-once Kafka Sink
 Key: FLINK-23124
 URL: https://issues.apache.org/jira/browse/FLINK-23124
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23123) Implement at-least-once Kafka Sink

2021-06-23 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-23123:
---

 Summary: Implement at-least-once Kafka Sink
 Key: FLINK-23123
 URL: https://issues.apache.org/jira/browse/FLINK-23123
 Project: Flink
  Issue Type: Sub-task
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22696) Enable Confluent Schema Registry Test on jdk 11

2021-05-18 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22696:
---

 Summary: Enable Confluent Schema Registry Test on jdk 11
 Key: FLINK-22696
 URL: https://issues.apache.org/jira/browse/FLINK-22696
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Reporter: Fabian Paul
Assignee: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22434) Dispatcher does not store suspended jobs in execution graph store

2021-04-23 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22434:
---

 Summary: Dispatcher does not store suspended jobs in execution 
graph store
 Key: FLINK-22434
 URL: https://issues.apache.org/jira/browse/FLINK-22434
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Fabian Paul
Assignee: Fabian Paul
 Fix For: 1.11.4, 1.14.0, 1.12.3, 1.13.1


Only globally terminated jobs are currently stored in the execution graph store 
after termination. In case the JobManager is shutdown and jobs are still 
running, these jobs will be suspended which is a non-globally terminated state.

The problem surfaces when a user tries to access information about the job 
during termination, leading to a job not found response. By storing all 
terminated jobs in the execution graph store this should be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22257) Clarify Flink ConfigOptions Usage

2021-04-13 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22257:
---

 Summary: Clarify Flink ConfigOptions Usage
 Key: FLINK-22257
 URL: https://issues.apache.org/jira/browse/FLINK-22257
 Project: Flink
  Issue Type: Improvement
Reporter: Fabian Paul


For users, it is hard to determine which ConfigOptions are relevant for the 
different stages of a Flink application.

Beginning from the translation of the user program to the execution on the 
cluster. In particular which options can be configured through the different 
channels.
 * Cluster configuration (i.e. flink-conf.yaml)
 * Application configuration, code-based

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22256) Persist checkpoint type information

2021-04-13 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-22256:
---

 Summary: Persist checkpoint type information
 Key: FLINK-22256
 URL: https://issues.apache.org/jira/browse/FLINK-22256
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Fabian Paul


As a user, it is retrospectively difficult to determine what kind of checkpoint 
(i.e. incremental, unaligned, ...) was performed when looking only at the 
persisted checkpoint metadata.

The only way would be to look into the execution configuration of the job which 
might not be available anymore and can be scattered across the application code 
and cluster configuration.

It would be highly beneficial if such information would be part of the 
persisted metadata to not track these external pointers.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21879) ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime fails on AZP

2021-03-19 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-21879:
---

 Summary: 
ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime
 fails on AZP
 Key: FLINK-21879
 URL: https://issues.apache.org/jira/browse/FLINK-21879
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Fabian Paul


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15047&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=6760
{code:java}
[ERROR] 
testWorkerRegistrationTimeoutNotCountingAllocationTime(org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest)
  Time elapsed: 0.388 s  <<< FAILURE!
java.lang.AssertionError: 

Expected: an instance of 
org.apache.flink.runtime.registration.RegistrationResponse$Success
 but:  is a 
org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.Assert.assertThat(Assert.java:923)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$13.lambda$new$2(ActiveResourceManagerTest.java:789)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$Context.runTest(ActiveResourceManagerTest.java:857)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest$13.(ActiveResourceManagerTest.java:770)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.testWorkerRegistrationTimeoutNotCountingAllocationTime(ActiveResourceManagerTest.java:753)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18501) Mapping of Pluggable Filesystems to scheme is not properly logged

2020-07-06 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-18501:
---

 Summary: Mapping of Pluggable Filesystems to scheme is not 
properly logged
 Key: FLINK-18501
 URL: https://issues.apache.org/jira/browse/FLINK-18501
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Affects Versions: 1.10.0, 1.11.0
Reporter: Fabian Paul


Currently, the following statement is used during FS instantiation
{code:java}
LOG.debug("Added file system {}:{}", factory.getScheme(), 
factory.getClass().getName());
{code}
but with the introduction of pluggable filesystems, the class name will always 
be PluginFileSystemFactory and does not give a notion which FS is used anymore.

A fix could be to replace the class name usage with toString and implement it 
within the PluginFileSystemFactory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16883) No support for log4j2 configuration formats besides properties

2020-03-31 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-16883:
---

 Summary: No support for log4j2 configuration formats besides 
properties
 Key: FLINK-16883
 URL: https://issues.apache.org/jira/browse/FLINK-16883
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Affects Versions: 1.11.0
Reporter: Fabian Paul


If `flink.console.sh` is used to start a Flink cluster the env java opts 
precede the log settings. 
([link|[https://github.com/apache/flink/blob/1444fd68115594b872201242886b4d789f4b26a5/flink-dist/src/main/flink-bin/bin/flink-console.sh#L73]]

This way the log settings `log4.configurationFile` will always overwrite 
previous keys. Since the `log4j.configurationFile` is set to `log4j.properties` 
it is not possible to leverage other formats than properties for the 
configuration.

 

My proposal would be to switch the order of the configurations that the log 
settings precede the env java opts. Users could then overwrite the default file 
with their configurations.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16570) Difficulties to select correct metric with long name in dropdown of Flink UI task menu

2020-03-12 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-16570:
---

 Summary: Difficulties to select correct metric with long name in 
dropdown of Flink UI task menu
 Key: FLINK-16570
 URL: https://issues.apache.org/jira/browse/FLINK-16570
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Fabian Paul
 Attachments: metrics_dropdown.png

As seen in the attached image it is currently difficult to select the correct 
metrics when the metric name exceeds the length of the dropdown because the 
full name cannot be seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16553) KafkaFetcher topic/partition metrics

2020-03-11 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-16553:
---

 Summary: KafkaFetcher topic/partition metrics
 Key: FLINK-16553
 URL: https://issues.apache.org/jira/browse/FLINK-16553
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka, Runtime / Metrics
Reporter: Fabian Paul


When using the Kafka universal connector, currently not all KafkaFetcher 
metrics ([link| 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java]
 which are exposed through the KafkaConsumer are accessible within the Flink 
metrics system.

Especially, all metrics which are related to topics and partitions are not 
available. The KafkaConsumer internally only registers those metrics after it 
has fetched some records.

Unfortunately, at the moment Flink only checks the available metrics right 
after the initialization of the KafkaConsumer when no records are polled, yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16525) TwoPhaseCommitSinkFunction subtask logs misleading name

2020-03-10 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-16525:
---

 Summary: TwoPhaseCommitSinkFunction subtask logs misleading name
 Key: FLINK-16525
 URL: https://issues.apache.org/jira/browse/FLINK-16525
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Fabian Paul


The current name() function in TwoPhaseCommitSinkFunction tries to describe the 
currently running subtask with its class name, the index of the subtask and the 
number of parallel subtasks.

Since the starting index of the subtask is 0, and the starting number for the 
parallelism is 1, it could lead to the following log message.
{code:java}
15:59:41,448 INFO  
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - 
FlinkKafkaProducer 0/1 - checkpoint 1 complete, committing transaction 
TransactionHolder{handle=KafkaTransactionState [transactionalId=null, 
producerId=-1, epoch=-1], transactionStartTime=1583852371370} from checkpoint 1
{code}
Although only one subtask is running it describes the subtask as 0/1 which 
might indicate more than one subtask.

I would suggest incrementing the first number after the class name by 1 to 
better indicate how many subtasks are running.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)