[jira] [Commented] (FLINK-36162) Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference
[ https://issues.apache.org/jira/browse/FLINK-36162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877194#comment-17877194 ] Thomas Weise commented on FLINK-36162: -- [~gyfora] I was surprised when you closed your PR but now it makes sense :) Agreed that it is better to remove these fields, they aren't solid enough and potentially hard to straighten out in the future. They also don't solve anything beyond what initialSavepointPath already covers. > Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot > jobReference > - > > Key: FLINK-36162 > URL: https://issues.apache.org/jira/browse/FLINK-36162 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > > I think in the initial version we should remove both the newly introduced > job.spec.flinkStateSnapshotReference and > FlinkStateSnapshot.jobReference.namspace fields as they generally allow users > to trigger and access savepoint paths from namespaces where the user may not > have permissions. > Let me give you 2 examples: > jobReference.namespace, allows us to trigger a savepoint for a job in a > different namespace. This works as long as the operator has access to the > user and does not verify that the current user in fact does. This may > ultimately allow us to trigger a savepoint to a custom place and even steal > the state. > In a similar way the initial flinkStateSnapshot reference would allow us to > steal a savepoint path that we normally don't know/have access to and store > it in our resource. > I suggest to simply remove these until we have a good way to solve these > issues, I think there is generally not much use for these fields overall. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36145) Change JobSpec.flinkStateSnapshotReference to snapshotReference
[ https://issues.apache.org/jira/browse/FLINK-36145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876534#comment-17876534 ] Thomas Weise commented on FLINK-36145: -- `snapshotReference` sounds a bit misleading, `initialStateReference` is better. I think the "initial" part is actually important so it is clear that the reference only applies till next snapshot. > Change JobSpec.flinkStateSnapshotReference to snapshotReference > --- > > Key: FLINK-36145 > URL: https://issues.apache.org/jira/browse/FLINK-36145 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > To avoid redundant / verbose naming we should change this field name in the > spec before it's released: > JobSpec.flinkStateSnapshotReference -> JobSpec.snapshotReference or > JobSpec.stateSnapshotReference -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
[ https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864421#comment-17864421 ] Thomas Weise commented on FLINK-33545: -- I agree that even if we fix the underlying bug in FLINK-35749 it would still be useful to have this guard to detect situations where otherwise loss of data would be hard to spot. It could also be optional behind a flag, if the overhead was significant or of concern. And [~arvid] glad to see you back active in the Flink community! > KafkaSink implementation can cause dataloss during broker issue when not > using EXACTLY_ONCE if there's any batching > --- > > Key: FLINK-33545 > URL: https://issues.apache.org/jira/browse/FLINK-33545 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Kevin Tseng >Assignee: Kevin Tseng >Priority: Major > Labels: pull-request-available > > In the current implementation of KafkaSource and KafkaSink there are some > assumption that were made: > # KafkaSource completely relies on Checkpoint to manage and track its offset > in *KafkaSourceReader* class > # KafkaSink in *KafkaWriter* class only performs catch-flush when > *DeliveryGuarantee.EXACTLY_ONCE* is specified. > KafkaSource is assuming that checkpoint should be properly fenced and > everything it had read up-til checkpoint being initiated will be processed or > recorded by operators downstream, including the TwoPhaseCommiter such as > *KafkaSink* > *KafkaSink* goes by the model of: > > {code:java} > flush -> prepareCommit -> commit{code} > > In a scenario that: > * KafkaSource ingested records #1 to #100 > * KafkaSink only had chance to send records #1 to #96 > * with a batching interval of 5ms > when checkpoint has been initiated, flush will only confirm the sending of > record #1 to #96. > This allows checkpoint to proceed as there's no error, and record #97 to 100 > will be batched after first flush. > Now, if broker goes down / has issue that caused the internal KafkaProducer > to not be able to send out the record after a batch, and is on a constant > retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), > *WriterCallback* error handling will never be triggered until the next > checkpoint flush. > This can be tested by creating a faulty Kafka cluster and run the following > code: > {code:java} > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); > props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer"); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > final KafkaProducer producer = new KafkaProducer<>(props); > try { > for (int i = 0; i < 10; i++) { > System.out.printf("sending record #%d\n", i); > String data = UUID.randomUUID().toString(); > final ProducerRecord record = new > ProducerRecord<>(TOPIC, Integer.toString(i), data); > producer.send(record, new CB(Integer.toString(i), data)); > Thread.sleep(1); //sleep for 10 seconds > } > } catch (Exception e) { > e.printStackTrace(); > } finally { > System.out.println("flushing"); > producer.flush(); > System.out.println("closing"); > producer.close(); > }{code} > Once callback returns due to network timeout, it will cause Flink to restart > from previously saved checkpoint (which recorded reading up to record #100), > but KafkaWriter never sent record #97 to #100. > This will result in dataloss of record #97 to #100 > Because KafkaWriter only catches error *after* callback, if callback is never > invoked (due to broker issue) right after the first flush has taken place, > those records are effectively gone unless someone decided to go back and look > for it. > This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but > is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}. > There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}. > prepareCommit will produce a list of KafkaCommittable that corresponds to > Transactional KafkaProducer to be committed. And a catch up flush will take > place during *commit* step. Whether this was intentional or not, due to the > fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the > end of EXACTLY_ONCE actually ensured everything f
[jira] [Commented] (FLINK-16686) [State TTL] Make user class loader available in native RocksDB compaction thread
[ https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847467#comment-17847467 ] Thomas Weise commented on FLINK-16686: -- Flink 1.17: {code:java} Exception in thread "Thread-14" java.lang.IllegalArgumentException: classLoader cannot be null. at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:550) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:391) at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205) at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191) {code} > [State TTL] Make user class loader available in native RocksDB compaction > thread > > > Key: FLINK-16686 > URL: https://issues.apache.org/jira/browse/FLINK-16686 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3, 1.17.0 >Reporter: Andrey Zagrebin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > The issue is initially reported > [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found]. > The problem is that the java code of Flink compaction filter is called from > RocksDB native C++ code. It is called in the context of the native compaction > thread. RocksDB has utilities to create java Thread context for the Flink > java callback. Presumably, the Java thread context class loader is not set at > all and if it is queried then it produces NullPointerException. > The provided report enabled a list state with TTL. The compaction filter has > to deserialise elements to check expiration. The deserialiser relies on Kryo > which queries the thread context class loader which is expected to be the > user class loader of the task but turns out to be null. > We should investigate how to pass the user class loader to the compaction > thread of the list state with TTL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-16686) [State TTL] Make user class loader available in native RocksDB compaction thread
[ https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-16686: - Affects Version/s: 1.17.0 > [State TTL] Make user class loader available in native RocksDB compaction > thread > > > Key: FLINK-16686 > URL: https://issues.apache.org/jira/browse/FLINK-16686 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3, 1.17.0 >Reporter: Andrey Zagrebin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > The issue is initially reported > [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found]. > The problem is that the java code of Flink compaction filter is called from > RocksDB native C++ code. It is called in the context of the native compaction > thread. RocksDB has utilities to create java Thread context for the Flink > java callback. Presumably, the Java thread context class loader is not set at > all and if it is queried then it produces NullPointerException. > The provided report enabled a list state with TTL. The compaction filter has > to deserialise elements to check expiration. The deserialiser relies on Kryo > which queries the thread context class loader which is expected to be the > user class loader of the task but turns out to be null. > We should investigate how to pass the user class loader to the compaction > thread of the list state with TTL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28048) Introduce Source API alternative to FiniteTestSource
[ https://issues.apache.org/jira/browse/FLINK-28048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-28048. -- Fix Version/s: 1.20.0 Resolution: Implemented > Introduce Source API alternative to FiniteTestSource > > > Key: FLINK-28048 > URL: https://issues.apache.org/jira/browse/FLINK-28048 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Tests >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This also has to verify that Iceberg connector tests mentioned in FLINK-28054 > also get covered by the solution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-25565) Write and Read Parquet INT64 Timestamp
[ https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-25565: - Fix Version/s: 1.18.1 > Write and Read Parquet INT64 Timestamp > -- > > Key: FLINK-25565 > URL: https://issues.apache.org/jira/browse/FLINK-25565 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0, 1.15.0 >Reporter: Bo Cui >Assignee: Bo Cui >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0, 1.18.1 > > > Flink cannot read parquet files that contain INT64 Timestamp generated by > Spark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25565) Write and Read Parquet INT64 Timestamp
[ https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794287#comment-17794287 ] Thomas Weise commented on FLINK-25565: -- [~martijnvisser] the default behavior did not change. It is necessary to opt into the INT64 mapping, the PR added the config option to the same page (write.int64.timestamp). I agree it would be helpful if the mapping table itself mentions it also though. > Write and Read Parquet INT64 Timestamp > -- > > Key: FLINK-25565 > URL: https://issues.apache.org/jira/browse/FLINK-25565 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0, 1.15.0 >Reporter: Bo Cui >Assignee: Bo Cui >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > Flink cannot read parquet files that contain INT64 Timestamp generated by > Spark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25565) Write and Read Parquet INT64 Timestamp
[ https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794237#comment-17794237 ] Thomas Weise commented on FLINK-25565: -- It turns out that without this option, tables with timestamp columns written by Spark and probably other tools cannot be consumed. Thanks [~Bo Cui] and sorry for the long wait. > Write and Read Parquet INT64 Timestamp > -- > > Key: FLINK-25565 > URL: https://issues.apache.org/jira/browse/FLINK-25565 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0, 1.15.0 >Reporter: Bo Cui >Assignee: Bo Cui >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > Flink cannot read parquet files that contain INT64 Timestamp generated by > Spark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-25565) Write and Read Parquet INT64 Timestamp
[ https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-25565. -- Fix Version/s: 1.19.0 Resolution: Fixed > Write and Read Parquet INT64 Timestamp > -- > > Key: FLINK-25565 > URL: https://issues.apache.org/jira/browse/FLINK-25565 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0, 1.15.0 >Reporter: Bo Cui >Assignee: Bo Cui >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > Flink cannot read parquet files that contain INT64 Timestamp generated by > Spark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check
[ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-27529. -- Resolution: Fixed > HybridSourceSplitEnumerator sourceIndex using error Integer check > - > > Key: FLINK-27529 > URL: https://issues.apache.org/jira/browse/FLINK-27529 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.14.4, 1.15.0, 1.15.1 >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer > type but == operator. > As hybrid source definition, it can concat with more than 2 child sources. so > currently works just because Integer cache(only works <=127), if we have more > sources will fail on error. In a word, we can't use == to compare Integer > index unless we limit hybrid sources only works <=127. > e.g. > {code:java} > Integer i1 = 128; > Integer i2 = 128; > System.out.println(i1 == i2); > int i3 = 128; > int i4 = 128; > System.out.println((Integer) i3 == (Integer) i4); > {code} > It will show false, false. > HybridSource Integer index comparison is below: > {code:java} > @Override > public Map registeredReaders() { > > Integer lastIndex = null; > for (Integer sourceIndex : readerSourceIndex.values()) { > if (lastIndex != null && lastIndex != sourceIndex) { > return filterRegisteredReaders(readers); > } > lastIndex = sourceIndex; > } > return readers; > } > private Map filterRegisteredReaders(Map ReaderInfo> readers) { > Map readersForSource = new > HashMap<>(readers.size()); > for (Map.Entry e : readers.entrySet()) { > if (readerSourceIndex.get(e.getKey()) == (Integer) > sourceIndex) { > readersForSource.put(e.getKey(), e.getValue()); > } > } > return readersForSource; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check
[ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27529: - Fix Version/s: 1.19.0 > HybridSourceSplitEnumerator sourceIndex using error Integer check > - > > Key: FLINK-27529 > URL: https://issues.apache.org/jira/browse/FLINK-27529 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.14.4, 1.15.0, 1.15.1 >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer > type but == operator. > As hybrid source definition, it can concat with more than 2 child sources. so > currently works just because Integer cache(only works <=127), if we have more > sources will fail on error. In a word, we can't use == to compare Integer > index unless we limit hybrid sources only works <=127. > e.g. > {code:java} > Integer i1 = 128; > Integer i2 = 128; > System.out.println(i1 == i2); > int i3 = 128; > int i4 = 128; > System.out.println((Integer) i3 == (Integer) i4); > {code} > It will show false, false. > HybridSource Integer index comparison is below: > {code:java} > @Override > public Map registeredReaders() { > > Integer lastIndex = null; > for (Integer sourceIndex : readerSourceIndex.values()) { > if (lastIndex != null && lastIndex != sourceIndex) { > return filterRegisteredReaders(readers); > } > lastIndex = sourceIndex; > } > return readers; > } > private Map filterRegisteredReaders(Map ReaderInfo> readers) { > Map readersForSource = new > HashMap<>(readers.size()); > for (Map.Entry e : readers.entrySet()) { > if (readerSourceIndex.get(e.getKey()) == (Integer) > sourceIndex) { > readersForSource.put(e.getKey(), e.getValue()); > } > } > return readersForSource; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785549#comment-17785549 ] Thomas Weise commented on FLINK-33402: -- [~varun1729dd] thanks for investigating this. It would be helpful to understand better why we see the race condition as under the mailbox model this should not happen. Let's continue the discussion on the PR. > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Assignee: Varun Narayanan Chakravarthy >Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > sp
[jira] [Assigned] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-33402: Assignee: Varun Narayanan Chakravarthy > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Assignee: Varun Narayanan Chakravarthy >Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-sour
[jira] [Resolved] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-32884. -- Resolution: Fixed > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, > but it does not support the placement of the JobManager behind a proxy or > using an Ingress for routing to a specific Flink cluster based on the URL > path. In the current scenario, it expects JobManager to access PyFlink jobs > at `http://:/v1/jobs` endpoint. Mapping to a non-root > location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is a URL with a path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise closed FLINK-32884. Resolution: Fixed > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, > but it does not support the placement of the JobManager behind a proxy or > using an Ingress for routing to a specific Flink cluster based on the URL > path. In the current scenario, it expects JobManager to access PyFlink jobs > at `http://:/v1/jobs` endpoint. Mapping to a non-root > location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is a URL with a path (also support for https > scheme). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
[ https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-32885: Assignee: Elkhan Dadashov > Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used > by RestClusterClient for PyFlink remote execution > - > > Key: FLINK-32885 > URL: https://issues.apache.org/jira/browse/FLINK-32885 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Table SQL / Gateway >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > > UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has > dependency on `flink-clients` module. RestClusterClient will also need to use > UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor > related classes to achieve this. > I intend to change these classes in a backward compatible way > flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java > flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java > flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java > flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
[ https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-32884: Assignee: Elkhan Dadashov > PyFlink remote execution should support URLs with paths and https scheme > > > Key: FLINK-32884 > URL: https://issues.apache.org/jira/browse/FLINK-32884 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.17.1 >Reporter: Elkhan Dadashov >Assignee: Elkhan Dadashov >Priority: Major > > Currently, the `SUBMIT_ARGS=remote -m http://:` format. For > local execution it works fine `SUBMIT_ARGS=remote -m > [http://localhost:8081|http://localhost:8081/]`, but it does not support the > placement of the JobManager befind a proxy or using an Ingress for routing to > a specific Flink cluster based on the URL path. In current scenario, it > expects JobManager access PyFlink jobs at `http://:/v1/jobs` > endpoint. Mapping to a non-root location, > `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` > is not supported. > This will use changes from > [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885) > Since RestClusterClient talks to the JobManager via its REST endpoint, the > right format for `SUBMIT_ARGS` is URL with path (also support for https > scheme). > I intend to change these classes in a backward compatible way > flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java > flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java > flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java > flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java > flink-core/src/main/java/org/apache/flink/util/NetUtils.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32035) SQL Client should support HTTPS with built-in JDK certificates
[ https://issues.apache.org/jira/browse/FLINK-32035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-32035. -- Fix Version/s: 1.18.0 Resolution: Implemented > SQL Client should support HTTPS with built-in JDK certificates > -- > > Key: FLINK-32035 > URL: https://issues.apache.org/jira/browse/FLINK-32035 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Related to FLINK-32030 > Internally SQL Client uses Flink’s _RestClient_ [1]. This client decides > whether to enable SSL not on the basis of the URL schema > ([https://|https:]...), but based on Flink configuration, namely a global > _security.ssl.rest.enabled_ parameter [2] (which is also used for the REST > server-side configuration ). When this parameter is set to true, it > automatically requires user-supplied _security.ssl.rest.truststore_ and > _security.ssl.rest.keystore_ to be configured - there is no default option to > use certificates from JDK. After URL support for SQL Client gateway mode > (FLINK-32030) gets added, the SQL Client should automatically use > certificates built in into the JDK unless user-supplied trust- and keystores > are configured. > [1] > [https://github.com/apache/flink/blob/5dddc0dba2be20806e67769314eecadf56b87a53/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java#L359] > [2] > [https://github.com/apache/flink/blob/5d9e63a16f079399c6b51547284bb96db0326bdb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java#L103] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32373) Support passing headers with SQL Client gateway requests
[ https://issues.apache.org/jira/browse/FLINK-32373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-32373. -- Fix Version/s: 1.18.0 Resolution: Fixed > Support passing headers with SQL Client gateway requests > > > Key: FLINK-32373 > URL: https://issues.apache.org/jira/browse/FLINK-32373 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > FLINK-32030 and FLINK-32035 enable communication from the SQL Client to the > SQL Gateway placed behind a proxy, such as a K8S ingress. Given that > authentication is typically needed in these cases, it can be achieved by > adding the ability to supply custom headers to the underlying RestClient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32137) Flame graph is hard to use with many task managers
[ https://issues.apache.org/jira/browse/FLINK-32137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-32137: Assignee: Vladimir Matveev > Flame graph is hard to use with many task managers > -- > > Key: FLINK-32137 > URL: https://issues.apache.org/jira/browse/FLINK-32137 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.1 >Reporter: Vladimir Matveev >Assignee: Vladimir Matveev >Priority: Major > Attachments: image (1).png, image-2023-05-23-11-01-30-391.png > > > In case there are many task managers executing the same operator, the flame > graph becomes very hard to use. As you can see on the attached picture, it > considers instances of the same lambda function as different classes, and > their number seems to be equal to the number of task managers (i.e. each JVM > gets its own "class" name, which is expected for lambdas I guess). This > lambda function is deep within Flink's own call stack, so this kind of graph > is inevitable regardless of the job's own logic, and there is nothing we can > do at the job logic's level to fix it. > This behavior makes evaluating the flame graph very hard, because all of the > useful information gets "compressed" inside each "column" of the graph, and > at the same time, it does not give any useful information since this is just > an artifact of the class name generation in the JVM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32109) Operator doesn't recognize JobManager stuck on volumeMount startup errors
[ https://issues.apache.org/jira/browse/FLINK-32109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723208#comment-17723208 ] Thomas Weise commented on FLINK-32109: -- [~gyfora] if the issue can be corrected externally and eventually the deployment can transition into running state w/o intelligence in the operator then it is probably best to just wait? What would be useful though is to bubble up the event to the flinkdeployment level. Similar to genuine errors that we already interpret this would require special logic to recognize the specific condition. That needs to be added on a best effort basis and I think that is OK since it is mostly for convenience (saving the client to dig into the lower level resources). > Operator doesn't recognize JobManager stuck on volumeMount startup errors > - > > Key: FLINK-32109 > URL: https://issues.apache.org/jira/browse/FLINK-32109 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0 >Reporter: Gyula Fora >Priority: Major > > Currently the flink deployment observer logic only reacts to Deployment > conditions such as failure to create the JM pod, image pull errors etc. > Pod startup errors such as volumeMount are not recognized as errors and the > operator keeps waiting for it indefintitely. > This is a tricky problem because volumeMount errors can be transient and are > only reported as Events for the pod so I am not completely sure if we can do > anything about this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32030) SQL Client gateway mode should accept URLs
[ https://issues.apache.org/jira/browse/FLINK-32030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-32030: - Fix Version/s: 1.18.0 > SQL Client gateway mode should accept URLs > -- > > Key: FLINK-32030 > URL: https://issues.apache.org/jira/browse/FLINK-32030 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Currently, the _--endpoint_ parameter has to be specified in the > _InetSocketAddress_ format, i.e. _hostname:port._ While this works fine for > basic use cases, it does not support the placement of the gateway behind a > proxy or using an Ingress for routing to a specific Flink cluster based on > the URL path. I.e. it expects > _[some.hostname.com:9001|http://some.hostname.com:9001/]_ to directly serve > requests on _[some.hostname.com:9001/v1|http://some.hostname.com:9001/v1]_ . > Mapping to a non-root location, i.e. > _[some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1|http://some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1]_ > is not supported. > > Since the client talks to the gateway via its REST endpoint, the right format > for the _--endpoint_ parameter is {_}URL{_}, not _InetSocketAddress_ . > The same _--endpoint_ parameter can be reused if the changes are implemented > in a backwards-compatible way. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32030) SQL Client gateway mode should accept URLs
[ https://issues.apache.org/jira/browse/FLINK-32030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-32030. -- Resolution: Fixed > SQL Client gateway mode should accept URLs > -- > > Key: FLINK-32030 > URL: https://issues.apache.org/jira/browse/FLINK-32030 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > > Currently, the _--endpoint_ parameter has to be specified in the > _InetSocketAddress_ format, i.e. _hostname:port._ While this works fine for > basic use cases, it does not support the placement of the gateway behind a > proxy or using an Ingress for routing to a specific Flink cluster based on > the URL path. I.e. it expects > _[some.hostname.com:9001|http://some.hostname.com:9001/]_ to directly serve > requests on _[some.hostname.com:9001/v1|http://some.hostname.com:9001/v1]_ . > Mapping to a non-root location, i.e. > _[some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1|http://some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1]_ > is not supported. > > Since the client talks to the gateway via its REST endpoint, the right format > for the _--endpoint_ parameter is {_}URL{_}, not _InetSocketAddress_ . > The same _--endpoint_ parameter can be reused if the changes are implemented > in a backwards-compatible way. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31997) Update to Fabric8 6.5.1+ in flink-kubernetes
[ https://issues.apache.org/jira/browse/FLINK-31997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720364#comment-17720364 ] Thomas Weise commented on FLINK-31997: -- [~gyfora] would it be possible to also shade the fabric8 dependency that goes into flink-kubernetes.jar? Otherwise there is the possibility of running into class path conflicts when the application also has a fabric8 dependency as I have seen in at least one case. > Update to Fabric8 6.5.1+ in flink-kubernetes > > > Key: FLINK-31997 > URL: https://issues.apache.org/jira/browse/FLINK-31997 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > > We should update the fabric8 version in flink-kubernetes to at least 6.5.1. > Flink currently uses a very old fabric8 version. The fabric8 library > dependencies have since been revised and greately improved to make them more > moduler and allow eliminating securitiy vulnerabilities more easily like: > https://issues.apache.org/jira/browse/FLINK-31815 > The newer versions especially 6.5.1 + also add some improvement stability > fixes for watches and other parts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31974) JobManager crashes after KubernetesClientException exception with FatalExitExceptionHandler
[ https://issues.apache.org/jira/browse/FLINK-31974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719355#comment-17719355 ] Thomas Weise commented on FLINK-31974: -- There are many cases where errors are transient. This specific case is actually quite obvious, the resource availability on a large cluster is changing constantly. A pod may not be scheduled now but few seconds later. Other k8s related issues can also be transient, for example a failed request due to rate limiting will likely succeed soon after and we would actually make things worse by not following a backoff/retry strategy and simply letting the job fail. I'm also leaning more towards retry by default strategy and identify the cases that should be fatal error. > JobManager crashes after KubernetesClientException exception with > FatalExitExceptionHandler > --- > > Key: FLINK-31974 > URL: https://issues.apache.org/jira/browse/FLINK-31974 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.17.0 >Reporter: Sergio Sainz >Assignee: Weijie Guo >Priority: Major > > When resource quota limit is reached JobManager will throw > > org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException: > Failure executing: POST at: > https://10.96.0.1/api/v1/namespaces/my-namespace/pods. Message: > Forbidden!Configured service account doesn't have access. Service account may > have been revoked. pods "my-namespace-flink-cluster-taskmanager-1-2" is > forbidden: exceeded quota: my-namespace-resource-quota, requested: > limits.cpu=3, used: limits.cpu=12100m, limited: limits.cpu=13. > > In {*}1.16.1 , this is handled gracefully{*}: > {code} > 2023-04-28 22:07:24,631 WARN > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Failed requesting worker with resource spec WorkerResourceSpec > \{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 > bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb > (241591914 bytes), numSlots=4}, current pending count: 0 > java.util.concurrent.CompletionException: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > POST at: https://10.96.0.1/api/v1/namespaces/my-namespace/pods. Message: > Forbidden!Configured service account doesn't have access. Service account may > have been revoked. pods "my-namespace-flink-cluster-taskmanager-1-138" is > forbidden: exceeded quota: my-namespace-resource-quota, requested: > limits.cpu=3, used: limits.cpu=12100m, limited: limits.cpu=13. > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure > executing: POST at: https://10.96.0.1/api/v1/namespaces/my-namespace/pods. > Message: Forbidden!Configured service account doesn't have access. Service > account may have been revoked. pods > "my-namespace-flink-cluster-taskmanager-1-138" is forbidden: exceeded quota: > my-namespace-resource-quota, requested: limits.cpu=3, used: > limits.cpu=12100m, limited: limits.cpu=13. > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:684) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:664) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:613) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:558) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:521) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:308) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:644) > ~[flink-dist-1.16.1.jar:1.16.1] > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:83) > ~[flink-dist-1.16.1.jar:1
[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch
[ https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704157#comment-17704157 ] Thomas Weise commented on FLINK-30859: -- The examples would lead to a chicken and egg problem: We would first need to release core, then the connector based on that core release, then the examples based on the connector release. That seems to suggest that the Kafka example should also live in the Kafka connector repo? > Remove flink-connector-kafka from master branch > --- > > Key: FLINK-30859 > URL: https://issues.apache.org/jira/browse/FLINK-30859 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Mason Chen >Priority: Major > > Remove flink-connector-kafka from master branch since the repo has now been > externalized and 1.17 commits have been sync'ed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31305) KafkaWriter doesn't wait for errors for in-flight records before completing flush
[ https://issues.apache.org/jira/browse/FLINK-31305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-31305: - Issue Type: Bug (was: Improvement) > KafkaWriter doesn't wait for errors for in-flight records before completing > flush > - > > Key: FLINK-31305 > URL: https://issues.apache.org/jira/browse/FLINK-31305 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1 >Reporter: Mason Chen >Assignee: Mason Chen >Priority: Major > Fix For: 1.17.0 > > > The KafkaWriter flushing needs to wait for all in-flight records to send > successfully. This can be achieved by tracking requests and returning a > response from the registered callback from the producer#send() logic. > There is potential for data loss since the checkpoint does not accurately > reflect that all records have been sent successfully, to preserve at least > once semantics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31305) KafkaWriter doesn't wait for errors for in-flight records before completing flush
[ https://issues.apache.org/jira/browse/FLINK-31305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-31305: Assignee: Mason Chen > KafkaWriter doesn't wait for errors for in-flight records before completing > flush > - > > Key: FLINK-31305 > URL: https://issues.apache.org/jira/browse/FLINK-31305 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1 >Reporter: Mason Chen >Assignee: Mason Chen >Priority: Major > Fix For: 1.17.0 > > > The KafkaWriter flushing needs to wait for all in-flight records to send > successfully. This can be achieved by tracking requests and returning a > response from the registered callback from the producer#send() logic. > There is potential for data loss since the checkpoint does not accurately > reflect that all records have been sent successfully, to preserve at least > once semantics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693339#comment-17693339 ] Thomas Weise commented on FLINK-22793: -- [~nicholasjiang] thanks for the confirmation! > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Ran Tao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-22793: Assignee: Ran Tao (was: Nicholas Jiang) > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Ran Tao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686606#comment-17686606 ] Thomas Weise commented on FLINK-22793: -- [~lemonjing] given the long time that has passed and all the FLIP work you have been driving, I think it is time to reassign this ticket to you. Let's wait couple more days and see if [~nicholasjiang] can confirm. > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Nicholas Jiang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30858) Kubernetes operator does not update reconciled generation
[ https://issues.apache.org/jira/browse/FLINK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682798#comment-17682798 ] Thomas Weise commented on FLINK-30858: -- https://lists.apache.org/thread/8y1zp4ogssy8ltsl42ppzvbo64dlzc3v > Kubernetes operator does not update reconciled generation > - > > Key: FLINK-30858 > URL: https://issues.apache.org/jira/browse/FLINK-30858 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.1 >Reporter: Thomas Weise >Priority: Major > > Kubernetes manages the generation field as part of the spec metadata. It will > be increased when changes are made to the resource. The counterpart in status > is "observed generation", provided by a controller. By comparing the two, the > client can determine that the controller has processed the spec and in > conjunction with other status information conclude that a change has been > reconciled. > The Flink operator currently tracks the generation as part of reconciled and > stable specs but these cannot be used as "observed generation" to perform the > check. The value isn't updated in cases where operator determines that there > are no changes to the spec that require deployment. This can be reproduced > through PUT/replace with the same spec or a change in upgrade mode. > The operator should provide the observed spec, which in conjunction with > deployment state can then be used by clients to determine that the spec has > been reconciled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30858) Kubernetes operator does not update reconciled generation
Thomas Weise created FLINK-30858: Summary: Kubernetes operator does not update reconciled generation Key: FLINK-30858 URL: https://issues.apache.org/jira/browse/FLINK-30858 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.1 Reporter: Thomas Weise Kubernetes manages the generation field as part of the spec metadata. It will be increased when changes are made to the resource. The counterpart in status is "observed generation", provided by a controller. By comparing the two, the client can determine that the controller has processed the spec and in conjunction with other status information conclude that a change has been reconciled. The Flink operator currently tracks the generation as part of reconciled and stable specs but these cannot be used as "observed generation" to perform the check. The value isn't updated in cases where operator determines that there are no changes to the spec that require deployment. This can be reproduced through PUT/replace with the same spec or a change in upgrade mode. The operator should provide the observed spec, which in conjunction with deployment state can then be used by clients to determine that the spec has been reconciled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode
[ https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-29109. -- Resolution: Fixed > Checkpoint path conflict with stateless upgrade mode > > > Key: FLINK-29109 > URL: https://issues.apache.org/jira/browse/FLINK-29109 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0, kubernetes-operator-1.2.0 > > > A stateful job with stateless upgrade mode (yes, there are such use cases) > fails with checkpoint path conflict due to constant jobId and FLINK-19358 > (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint > id resets on restart the job is going to write to previously used locations > and fail. The workaround is to rotate the jobId on every redeploy when the > upgrade mode is stateless. While this can be worked around externally it is > best done in the operator itself because reconciliation resolves when a > restart is actually required while rotating jobId externally may trigger > unnecessary restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode
[ https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-29109: - Fix Version/s: kubernetes-operator-1.3.0 > Checkpoint path conflict with stateless upgrade mode > > > Key: FLINK-29109 > URL: https://issues.apache.org/jira/browse/FLINK-29109 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0, kubernetes-operator-1.3.0 > > > A stateful job with stateless upgrade mode (yes, there are such use cases) > fails with checkpoint path conflict due to constant jobId and FLINK-19358 > (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint > id resets on restart the job is going to write to previously used locations > and fail. The workaround is to rotate the jobId on every redeploy when the > upgrade mode is stateless. While this can be worked around externally it is > best done in the operator itself because reconciliation resolves when a > restart is actually required while rotating jobId externally may trigger > unnecessary restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode
[ https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642849#comment-17642849 ] Thomas Weise commented on FLINK-29109: -- [~gyfora] thanks for catching this. Because the jobId assigned by Flink is deterministic (HighAvailabilityOptions.HA_CLUSTER_ID), we will also need to apply the random jobId for stateless upgrade mode for Flink version >= 1.16 to avoid the checkpoint path collisions. https://github.com/apache/flink/blob/e70fe68dea764606180ca3728184c00fc63ea0ff/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L227 > Checkpoint path conflict with stateless upgrade mode > > > Key: FLINK-29109 > URL: https://issues.apache.org/jira/browse/FLINK-29109 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > A stateful job with stateless upgrade mode (yes, there are such use cases) > fails with checkpoint path conflict due to constant jobId and FLINK-19358 > (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint > id resets on restart the job is going to write to previously used locations > and fail. The workaround is to rotate the jobId on every redeploy when the > upgrade mode is stateless. While this can be worked around externally it is > best done in the operator itself because reconciliation resolves when a > restart is actually required while rotating jobId externally may trigger > unnecessary restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30266) Recovery reconciliation loop fails if no checkpoint has been created yet
[ https://issues.apache.org/jira/browse/FLINK-30266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642133#comment-17642133 ] Thomas Weise commented on FLINK-30266: -- I believe this was discussed before and the reason we decided to not allow this was that we cannot safely determine the reason why the HA metadata is missing. It could be because there was never any successful checkpoint or because it was removed by mistake? As long as we can ensure that we don't accidentally reset a job with prior state to empty state I would also prefer the solution that does not involve manual intervention. > Recovery reconciliation loop fails if no checkpoint has been created yet > > > Key: FLINK-30266 > URL: https://issues.apache.org/jira/browse/FLINK-30266 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.0 >Reporter: Maximilian Michels >Assignee: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > When the upgradeMode is LAST-STATE, the operator fails to reconcile a failed > application unless at least one checkpoint has already been created. The > expected behavior would be that the job starts with empty state. > {noformat} > 2022-12-01 10:58:35,596 o.a.f.k.o.l.AuditUtils [INFO ] [app] >>> > Status | Error | UPGRADING | > {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA > metadata not available to restore from last state. It is possible that the > job has finished or terminally failed, or the configmaps have been deleted. > Manual restore > required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]} > {noformat} > {noformat} > 2022-12-01 10:44:49,480 i.j.o.p.e.ReconciliationDispatcher [ERROR] [app] > Error during event processing ExecutionScope{ resource id: > ResourceID{name='app', namespace='namespace'}, version: 216933301} failed. > org.apache.flink.kubernetes.operator.exception.ReconciliationException: > java.lang.RuntimeException: This indicates a bug... > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133) > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54) > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136) > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94) > at > org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) > at > io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54) > at > io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.RuntimeException: This indicates a bug... > at > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:61) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:212) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:144) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:167) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:64) > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(
[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641610#comment-17641610 ] Thomas Weise commented on FLINK-29634: -- [~pnowojski] please note that this ticket aims to add periodic triggering to the flink-kubernetes-operator, not to core Flink. Periodic triggering of savepoints (the existing feature) fits well into the charter of flink-kubernetes-operator. Based on FLINK-27101 it seems straightforward to extend it to let the operator also trigger full snapshots that can be used for recovery. It may even help to bridge the changed semantics of intermediate savepoints from FLINK-29856 > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640298#comment-17640298 ] Thomas Weise commented on FLINK-29099: -- [~sethsaperstein] thanks for the thorough investigation and fix! > Deadlock for Single Subtask in Kinesis Consumer > --- > > Key: FLINK-29099 > URL: https://issues.apache.org/jira/browse/FLINK-29099 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3 >Reporter: seth saperstein >Assignee: seth saperstein >Priority: Minor > Labels: connector, consumer, kinesis, pull-request-available > Fix For: 1.17.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Deadlock is reached as the result of: > * max lookahead reached for local watermark > * idle state for subtask > The lookahead prevents the RecordEmitter from emitting a new record. The idle > state prevents the global watermark from being updated. > To exit this deadlock state, we need to complete the [TODO > here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268] > which updates the global watermark while the subtask is marked idle, which > will then allow us to emit a record again as the lookahead is no longer > reached. > > *Context:* > We reached this scenario at Lyft as a result of prolonged CPU throttling on > all FlinkKinesisConsumer threads for multiple minutes. > Walking through the series of events for a single subtask: > * prolonged CPU throttling occurs and no logs are seen from any > FlinkKinesisConsumer thread for up to 15 minutes > * after CPU throttling the subtask is marked idle > * the subtask has reached the lookahead for its local watermark relative to > the global watermark > * WatermarkSyncCallback indicates the subtask as idle and does not update > the global watermark > * emitQueue fills to max > * RecordEmitter cannot emit records due to the max lookahead > * Deadlock on subtask > At this point, we had not realized what had happened and processing of all > other shards/subtasks had continued for multiple days. When we finally > restarted the application, we saw the following behavior: > * global watermark recalculated after all subtasks consumed data based on > the last kinesis record sequence number > * global watermark moved back in time multiple days, to when the subtask was > first marked idle > * the single subtask processed data while all others remained idle due to > the lookahead > This would have continued until the subtask had caught up to the others and > thus the global watermark is within reach of the lookahead for other subtasks. > > *Repro:* > Too difficult to repro the exact scenario. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-29099. -- Fix Version/s: 1.17.0 Resolution: Fixed > Deadlock for Single Subtask in Kinesis Consumer > --- > > Key: FLINK-29099 > URL: https://issues.apache.org/jira/browse/FLINK-29099 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3 >Reporter: seth saperstein >Priority: Minor > Labels: connector, consumer, kinesis, pull-request-available > Fix For: 1.17.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Deadlock is reached as the result of: > * max lookahead reached for local watermark > * idle state for subtask > The lookahead prevents the RecordEmitter from emitting a new record. The idle > state prevents the global watermark from being updated. > To exit this deadlock state, we need to complete the [TODO > here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268] > which updates the global watermark while the subtask is marked idle, which > will then allow us to emit a record again as the lookahead is no longer > reached. > > *Context:* > We reached this scenario at Lyft as a result of prolonged CPU throttling on > all FlinkKinesisConsumer threads for multiple minutes. > Walking through the series of events for a single subtask: > * prolonged CPU throttling occurs and no logs are seen from any > FlinkKinesisConsumer thread for up to 15 minutes > * after CPU throttling the subtask is marked idle > * the subtask has reached the lookahead for its local watermark relative to > the global watermark > * WatermarkSyncCallback indicates the subtask as idle and does not update > the global watermark > * emitQueue fills to max > * RecordEmitter cannot emit records due to the max lookahead > * Deadlock on subtask > At this point, we had not realized what had happened and processing of all > other shards/subtasks had continued for multiple days. When we finally > restarted the application, we saw the following behavior: > * global watermark recalculated after all subtasks consumed data based on > the last kinesis record sequence number > * global watermark moved back in time multiple days, to when the subtask was > first marked idle > * the single subtask processed data while all others remained idle due to > the lookahead > This would have continued until the subtask had caught up to the others and > thus the global watermark is within reach of the lookahead for other subtasks. > > *Repro:* > Too difficult to repro the exact scenario. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-29099: Assignee: seth saperstein > Deadlock for Single Subtask in Kinesis Consumer > --- > > Key: FLINK-29099 > URL: https://issues.apache.org/jira/browse/FLINK-29099 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3 >Reporter: seth saperstein >Assignee: seth saperstein >Priority: Minor > Labels: connector, consumer, kinesis, pull-request-available > Fix For: 1.17.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Deadlock is reached as the result of: > * max lookahead reached for local watermark > * idle state for subtask > The lookahead prevents the RecordEmitter from emitting a new record. The idle > state prevents the global watermark from being updated. > To exit this deadlock state, we need to complete the [TODO > here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268] > which updates the global watermark while the subtask is marked idle, which > will then allow us to emit a record again as the lookahead is no longer > reached. > > *Context:* > We reached this scenario at Lyft as a result of prolonged CPU throttling on > all FlinkKinesisConsumer threads for multiple minutes. > Walking through the series of events for a single subtask: > * prolonged CPU throttling occurs and no logs are seen from any > FlinkKinesisConsumer thread for up to 15 minutes > * after CPU throttling the subtask is marked idle > * the subtask has reached the lookahead for its local watermark relative to > the global watermark > * WatermarkSyncCallback indicates the subtask as idle and does not update > the global watermark > * emitQueue fills to max > * RecordEmitter cannot emit records due to the max lookahead > * Deadlock on subtask > At this point, we had not realized what had happened and processing of all > other shards/subtasks had continued for multiple days. When we finally > restarted the application, we saw the following behavior: > * global watermark recalculated after all subtasks consumed data based on > the last kinesis record sequence number > * global watermark moved back in time multiple days, to when the subtask was > first marked idle > * the single subtask processed data while all others remained idle due to > the lookahead > This would have continued until the subtask had caught up to the others and > thus the global watermark is within reach of the lookahead for other subtasks. > > *Repro:* > Too difficult to repro the exact scenario. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-12675) Event time synchronization in Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-12675. -- Resolution: Won't Fix > Event time synchronization in Kafka consumer > > > Key: FLINK-12675 > URL: https://issues.apache.org/jira/browse/FLINK-12675 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Thomas Weise >Priority: Major > Labels: auto-unassigned, pull-request-available > Attachments: 0001-Kafka-event-time-alignment.patch > > > Integrate the source watermark tracking into the Kafka consumer and implement > the sync mechanism (different consumer model, compared to Kinesis). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-10886) Event time synchronization across sources
[ https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-10886. -- Resolution: Done > Event time synchronization across sources > - > > Key: FLINK-10886 > URL: https://issues.apache.org/jira/browse/FLINK-10886 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Jamie Grier >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned > Original Estimate: 336h > Remaining Estimate: 336h > > When reading from a source with many parallel partitions, especially when > reading lots of historical data (or recovering from downtime and there is a > backlog to read), it's quite common for there to develop an event-time skew > across those partitions. > > When doing event-time windowing -- or in fact any event-time driven > processing -- the event time skew across partitions results directly in > increased buffering in Flink and of course the corresponding state/checkpoint > size growth. > > As the event-time skew and state size grows larger this can have a major > effect on application performance and in some cases result in a "death > spiral" where the application performance get's worse and worse as the state > size grows and grows. > > So, one solution to this problem, outside of core changes in Flink itself, > seems to be to try to coordinate sources across partitions so that they make > progress through event time at roughly the same rate. In fact if there is > large skew the idea would be to slow or even stop reading from some > partitions with newer data while first reading the partitions with older > data. Anyway, to do this we need to share state somehow amongst sub-tasks. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-12675) Event time synchronization in Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638662#comment-17638662 ] Thomas Weise commented on FLINK-12675: -- Partition/split level alignment is supported with the new KafkaSource: https://issues.apache.org/jira/browse/FLINK-28853 > Event time synchronization in Kafka consumer > > > Key: FLINK-12675 > URL: https://issues.apache.org/jira/browse/FLINK-12675 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Thomas Weise >Priority: Major > Labels: auto-unassigned, pull-request-available > Attachments: 0001-Kafka-event-time-alignment.patch > > > Integrate the source watermark tracking into the Kafka consumer and implement > the sync mechanism (different consumer model, compared to Kinesis). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover configmaps
[ https://issues.apache.org/jira/browse/FLINK-30004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-30004. -- Fix Version/s: kubernetes-operator-1.3.0 Resolution: Fixed > Cannot resume deployment after suspend with savepoint due to leftover > configmaps > > > Key: FLINK-30004 > URL: https://issues.apache.org/jira/browse/FLINK-30004 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the > deployment can get into a limbo state that requires manual intervention after > suspend with savepoint. If the config maps are not cleaned up the resumed job > will be considered finished and the operator recognize the JM deployment as > missing. Due to check for HA data which are now cleaned up, the job fails to > start and manual redeployment with initial savepoint is necessary. > This can be avoided by removing any leftover HA config maps after the job has > successfully stopped with savepoint (upgrade mode savepoint). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover configmaps
[ https://issues.apache.org/jira/browse/FLINK-30004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-30004: - Summary: Cannot resume deployment after suspend with savepoint due to leftover configmaps (was: Cannot resume deployment after suspend with savepoint due to leftover confgmaps) > Cannot resume deployment after suspend with savepoint due to leftover > configmaps > > > Key: FLINK-30004 > URL: https://issues.apache.org/jira/browse/FLINK-30004 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the > deployment can get into a limbo state that requires manual intervention after > suspend with savepoint. If the config maps are not cleaned up the resumed job > will be considered finished and the operator recognize the JM deployment as > missing. Due to check for HA data which are now cleaned up, the job fails to > start and manual redeployment with initial savepoint is necessary. > This can be avoided by removing any leftover HA config maps after the job has > successfully stopped with savepoint (upgrade mode savepoint). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover confgmaps
Thomas Weise created FLINK-30004: Summary: Cannot resume deployment after suspend with savepoint due to leftover confgmaps Key: FLINK-30004 URL: https://issues.apache.org/jira/browse/FLINK-30004 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: 1.2 Reporter: Thomas Weise Assignee: Thomas Weise Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the deployment can get into a limbo state that requires manual intervention after suspend with savepoint. If the config maps are not cleaned up the resumed job will be considered finished and the operator recognize the JM deployment as missing. Due to check for HA data which are now cleaned up, the job fails to start and manual redeployment with initial savepoint is necessary. This can be avoided by removing any leftover HA config maps after the job has successfully stopped with savepoint (upgrade mode savepoint). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29871) Upgrade operator Flink version and examples to 1.16
[ https://issues.apache.org/jira/browse/FLINK-29871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629128#comment-17629128 ] Thomas Weise commented on FLINK-29871: -- +1 > Upgrade operator Flink version and examples to 1.16 > --- > > Key: FLINK-29871 > URL: https://issues.apache.org/jira/browse/FLINK-29871 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > > We should update our Flink dependency and the default example version to 1.16 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29871) Upgrade operator Flink version and examples to 1.16
[ https://issues.apache.org/jira/browse/FLINK-29871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629085#comment-17629085 ] Thomas Weise commented on FLINK-29871: -- X.Y.0 releases are not vetted and prone to surprises. Stable environments typically move to a X.Y.1 or later Flink version. Can we make this so that we support 1.16.x but make it the default for operator only once 1.16.1 comes out? > Upgrade operator Flink version and examples to 1.16 > --- > > Key: FLINK-29871 > URL: https://issues.apache.org/jira/browse/FLINK-29871 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > > We should update our Flink dependency and the default example version to 1.16 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617676#comment-17617676 ] Thomas Weise commented on FLINK-29634: -- [~Jiale] that PR it the right place to look at wrt how periodic savepoint triggering was added. Please note that a good portion of it is related to savepoint history also though. The operator does not maintain checkpoint history as checkpoints (so far) are triggered by Flink internally. Although it may be ultimately good to also keep track of checkpoints that were triggered by the operator within the CR status, perhaps it is best if we start with just the periodic triggering support? WDYT [~gyfora] ? Please note that in order to work on this, the operator first needs to recognize Flink version 1.17 (currently it supports up to 1.16, see FlinkVersion). Then this feature needs to be built so that it is only effective for 1.17+ and operator keeps working with the other versions. > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29634) Support periodic checkpoint triggering
[ https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-29634: Assignee: Jiale Tan > Support periodic checkpoint triggering > -- > > Key: FLINK-29634 > URL: https://issues.apache.org/jira/browse/FLINK-29634 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Jiale Tan >Priority: Major > > Similar to the support for periodic savepoints, the operator should support > triggering periodic checkpoints to break the incremental checkpoint chain. > Support for external triggering will come with 1.17: > https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29634) Support periodic checkpoint triggering
Thomas Weise created FLINK-29634: Summary: Support periodic checkpoint triggering Key: FLINK-29634 URL: https://issues.apache.org/jira/browse/FLINK-29634 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Thomas Weise Similar to the support for periodic savepoints, the operator should support triggering periodic checkpoints to break the incremental checkpoint chain. Support for external triggering will come with 1.17: https://issues.apache.org/jira/browse/FLINK-27101 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29497) Provide an option to publish the flink-dist jar file artifact
[ https://issues.apache.org/jira/browse/FLINK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614743#comment-17614743 ] Thomas Weise commented on FLINK-29497: -- Hi [~chesnay] , at the moment we are using this to replace flink-dist.jar in our docker image. Going forward, we may also be interested to use it as one stop shop for local development dependencies, since it would result in a more consistent environment setup. Your idea of splitting the current dist makes perfectly sense. The dist jar is useful outside the binary distribution and the full binary distribution could be assembled by using the published dist jar. I'm going to open a PR to add the deploy switch in case someone else is interested using it. > Provide an option to publish the flink-dist jar file artifact > - > > Key: FLINK-29497 > URL: https://issues.apache.org/jira/browse/FLINK-29497 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.16.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > > Currently deployment is skipped for the flink-dist jar file. Instead of > hardcoding that in pom.xml, use a property that can control this behavior > from the maven command line. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-27479) HybridSource refreshes availability future
[ https://issues.apache.org/jira/browse/FLINK-27479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-27479. -- Resolution: Fixed > HybridSource refreshes availability future > -- > > Key: FLINK-27479 > URL: https://issues.apache.org/jira/browse/FLINK-27479 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4 >Reporter: Mason Chen >Assignee: Mason Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: hybrid-source-with-fix.html, > hybrid-source-without-fix.html, kafka-source.html > > > HybridSourceReader needs to refresh the availability future according to the > underlying reader. It currently maintains its own future and completes it > after the sub-reader's availability future is complete. However, the > implementation does not refresh the future again until the reader receives a > switch event. This can cause a tight loop with the Flink runtime repeatedly > invoking pollNext() and high CPU utilization. > > To solve this, we can reuse the MultipleFuturesAvailabilityHelper to manage > the lifecycle of the availability future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27479) HybridSource refreshes availability future
[ https://issues.apache.org/jira/browse/FLINK-27479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27479: - Fix Version/s: (was: 1.15.3) > HybridSource refreshes availability future > -- > > Key: FLINK-27479 > URL: https://issues.apache.org/jira/browse/FLINK-27479 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4 >Reporter: Mason Chen >Assignee: Mason Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: hybrid-source-with-fix.html, > hybrid-source-without-fix.html, kafka-source.html > > > HybridSourceReader needs to refresh the availability future according to the > underlying reader. It currently maintains its own future and completes it > after the sub-reader's availability future is complete. However, the > implementation does not refresh the future again until the reader receives a > switch event. This can cause a tight loop with the Flink runtime repeatedly > invoking pollNext() and high CPU utilization. > > To solve this, we can reuse the MultipleFuturesAvailabilityHelper to manage > the lifecycle of the availability future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29497) Provide an option to publish the flink-dist jar file artifact
Thomas Weise created FLINK-29497: Summary: Provide an option to publish the flink-dist jar file artifact Key: FLINK-29497 URL: https://issues.apache.org/jira/browse/FLINK-29497 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.16.0 Reporter: Thomas Weise Assignee: Thomas Weise Currently deployment is skipped for the flink-dist jar file. Instead of hardcoding that in pom.xml, use a property that can control this behavior from the maven command line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29413) Make it possible to associate triggered and completed savepoints
[ https://issues.apache.org/jira/browse/FLINK-29413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17609561#comment-17609561 ] Thomas Weise commented on FLINK-29413: -- +1 for tracking the triggering nonce Would there be any other way to retrieve the savepoint type given that Flink does not retain a history beyond current job execution? > Make it possible to associate triggered and completed savepoints > > > Key: FLINK-29413 > URL: https://issues.apache.org/jira/browse/FLINK-29413 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > Currently it is not clear how one would assoicate completed manual savepoints > with savepointTriggerNonce-es when using the operator. > This makes it difficult to track when a savepoint was completed vs when it > was abandoned > One idea would be to add the savepointTriggerNonce to the completed > checkpoint info for Manual savepoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29251) Send CREATED status and Cancel event via FlinkResourceListener
[ https://issues.apache.org/jira/browse/FLINK-29251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise closed FLINK-29251. Resolution: Fixed > Send CREATED status and Cancel event via FlinkResourceListener > -- > > Key: FLINK-29251 > URL: https://issues.apache.org/jira/browse/FLINK-29251 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Matyas Orhidi >Assignee: Matyas Orhidi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > To complete the lifecycle history of a custom resource the operator should > sent: > * CREATED status notification during initial deployment of a CR > * Cancel event when deleting a CR -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error
[ https://issues.apache.org/jira/browse/FLINK-29100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise closed FLINK-29100. Resolution: Fixed > Deployment with last-state upgrade mode stuck after initial error > - > > Key: FLINK-29100 > URL: https://issues.apache.org/jira/browse/FLINK-29100 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > A deployment with last_state upgrade mode that never succeeds will be stuck > in deploying state because no HA data exists. This can be reproduced by > creating a deployment with invalid image or exception in entry point. Update > to the CR that corrects the issue won't be reconciled due to > "o.a.f.k.o.r.d.ApplicationReconciler [INFO ] > [default.basic-checkpoint-ha-example] Job is not running yet and HA metadata > is not available, waiting for upgradeable state". This forces manual > intervention to delete the CR. > Instead, operator should check if this is the initial deployment and if so > skip the HA metadata check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error
[ https://issues.apache.org/jira/browse/FLINK-29100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-29100: - Fix Version/s: kubernetes-operator-1.2.0 > Deployment with last-state upgrade mode stuck after initial error > - > > Key: FLINK-29100 > URL: https://issues.apache.org/jira/browse/FLINK-29100 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > A deployment with last_state upgrade mode that never succeeds will be stuck > in deploying state because no HA data exists. This can be reproduced by > creating a deployment with invalid image or exception in entry point. Update > to the CR that corrects the issue won't be reconciled due to > "o.a.f.k.o.r.d.ApplicationReconciler [INFO ] > [default.basic-checkpoint-ha-example] Job is not running yet and HA metadata > is not available, waiting for upgradeable state". This forces manual > intervention to delete the CR. > Instead, operator should check if this is the initial deployment and if so > skip the HA metadata check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29159) Revisit/harden initial deployment logic
Thomas Weise created FLINK-29159: Summary: Revisit/harden initial deployment logic Key: FLINK-29159 URL: https://issues.apache.org/jira/browse/FLINK-29159 Project: Flink Issue Type: Technical Debt Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Thomas Weise Found isFirstDeployment logic not working as expected for a deployment that had never successfully deployed (image pull error). We are probably also lacking test coverage for the initialSavepointPath field. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29144) Enable multiple jar entries for jarURI
[ https://issues.apache.org/jira/browse/FLINK-29144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598040#comment-17598040 ] Thomas Weise commented on FLINK-29144: -- Please note that the operator is not in the business of interpreting that config, including not listing directories to assemble a jar file list. Jar files don't have to be "local" either. > Enable multiple jar entries for jarURI > -- > > Key: FLINK-29144 > URL: https://issues.apache.org/jira/browse/FLINK-29144 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Arseniy Tashoyan >Priority: Major > > The setting _job.jarURI_ accepts a string with the path to the jar-file: > {code:yaml} > job: > jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar > {code} > This could be improved to accept a list of jars: > {code:yaml} > job: > jarURIs: > - local:///opt/flink/examples/streaming/StateMachineExample.jar > - local:///opt/common/scala-logging.jar > {code} > This could also be improved to accept one or more directories with jars: > {code:yaml} > job: > jarDirs: > - local:///opt/app/lib > - local:///opt/common/lib > {code} > The order of entries in the list defines the order of jars in the classpath. > Internally, Flink Kubernetes Operator uses the property _pipeline.jars_ - see > [FlinkConfigBuilder.java > |https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L259]: > {code:java} > effectiveConfig.set(PipelineOptions.JARS, > Collections.singletonList(uri.toString())); > {code} > The property _pipeline.jars_ allows to pass more than one jar entry. > This improvement allows to avoid building a fat-jar. Instead we could provide > directories with normal jars. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29144) Enable multiple jar entries for jarURI
[ https://issues.apache.org/jira/browse/FLINK-29144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597903#comment-17597903 ] Thomas Weise commented on FLINK-29144: -- [~gyfora] since this isn't a common requirement, I would also prefer we allow using the configuration parameter PipelineOptions.JARS for this. That would mean the operator needs a slight change to not just overwrite that parameter. > Enable multiple jar entries for jarURI > -- > > Key: FLINK-29144 > URL: https://issues.apache.org/jira/browse/FLINK-29144 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Arseniy Tashoyan >Priority: Major > > The setting _job.jarURI_ accepts a string with the path to the jar-file: > {code:yaml} > job: > jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar > {code} > This could be improved to accept a list of jars: > {code:yaml} > job: > jarURIs: > - local:///opt/flink/examples/streaming/StateMachineExample.jar > - local:///opt/common/scala-logging.jar > {code} > This could also be improved to accept one or more directories with jars: > {code:yaml} > job: > jarDirs: > - local:///opt/app/lib > - local:///opt/common/lib > {code} > The order of entries in the list defines the order of jars in the classpath. > Internally, Flink Kubernetes Operator uses the property _pipeline.jars_ - see > [FlinkConfigBuilder.java > |https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L259]: > {code:java} > effectiveConfig.set(PipelineOptions.JARS, > Collections.singletonList(uri.toString())); > {code} > The property _pipeline.jars_ allows to pass more than one jar entry. > This improvement allows to avoid building a fat-jar. Instead we could provide > directories with normal jars. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode
Thomas Weise created FLINK-29109: Summary: Checkpoint path conflict with stateless upgrade mode Key: FLINK-29109 URL: https://issues.apache.org/jira/browse/FLINK-29109 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Thomas Weise Assignee: Thomas Weise A stateful job with stateless upgrade mode (yes, there are such use cases) fails with checkpoint path conflict due to constant jobId and FLINK-19358 (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint id resets on restart the job is going to write to previously used locations and fail. The workaround is to rotate the jobId on every redeploy when the upgrade mode is stateless. While this can be worked around externally it is best done in the operator itself because reconciliation resolves when a restart is actually required while rotating jobId externally may trigger unnecessary restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error
Thomas Weise created FLINK-29100: Summary: Deployment with last-state upgrade mode stuck after initial error Key: FLINK-29100 URL: https://issues.apache.org/jira/browse/FLINK-29100 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Thomas Weise Assignee: Thomas Weise A deployment with last_state upgrade mode that never succeeds will be stuck in deploying state because no HA data exists. This can be reproduced by creating a deployment with invalid image or exception in entry point. Update to the CR that corrects the issue won't be reconciled due to "o.a.f.k.o.r.d.ApplicationReconciler [INFO ] [default.basic-checkpoint-ha-example] Job is not running yet and HA metadata is not available, waiting for upgradeable state". This forces manual intervention to delete the CR. Instead, operator should check if this is the initial deployment and if so skip the HA metadata check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close
[ https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-28977. -- Resolution: Fixed > NullPointerException in HybridSourceSplitEnumerator.close > - > > Key: FLINK-28977 > URL: https://issues.apache.org/jira/browse/FLINK-28977 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Michael >Priority: Major > Labels: pull-request-available, pull-requests-available > Fix For: 1.17.0 > > > HybridSource pipeline has an intermittent error when reading from s3, usually > this error is fixed when pipeline restarts after recovering from checkpoint. > But intermittently happens: > 2022/08/02 22:26:51.435 INFO o.a.f.runtime.jobmaster.JobMaster - Trying to > recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> > filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator > fd9fbc680ee884c4eafd0b9c2d3d007f). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393) > ... > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151) > at > org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70) > at java.lang.Thread.run(Thread.java:750) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close
[ https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-28977: - Fix Version/s: 1.17.0 > NullPointerException in HybridSourceSplitEnumerator.close > - > > Key: FLINK-28977 > URL: https://issues.apache.org/jira/browse/FLINK-28977 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Michael >Priority: Major > Labels: pull-request-available, pull-requests-available > Fix For: 1.17.0 > > > HybridSource pipeline has an intermittent error when reading from s3, usually > this error is fixed when pipeline restarts after recovering from checkpoint. > But intermittently happens: > 2022/08/02 22:26:51.435 INFO o.a.f.runtime.jobmaster.JobMaster - Trying to > recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> > filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator > fd9fbc680ee884c4eafd0b9c2d3d007f). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393) > ... > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151) > at > org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70) > at java.lang.Thread.run(Thread.java:750) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-21126) Reconsider FLINK_PROPERTIES
[ https://issues.apache.org/jira/browse/FLINK-21126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-21126: - Fix Version/s: (was: 1.17.0) > Reconsider FLINK_PROPERTIES > --- > > Key: FLINK-21126 > URL: https://issues.apache.org/jira/browse/FLINK-21126 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts >Reporter: Chesnay Schepler >Priority: Minor > Labels: auto-deprioritized-major > > The docker scripts support a {{FLINK_PROPERTIES}} environment variable that > contains a multi-line string with config options, that is piped into the > configuration. > This variable is somewhat redundant, because docker users can also specify a > list of dynamic properties when starting a container, and Kubernetes users > are advised to use config maps instead. > FLIP-161 might also re-introduce new ways to configure Flink via environment > variables. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close
[ https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-28977: Assignee: Michael > NullPointerException in HybridSourceSplitEnumerator.close > - > > Key: FLINK-28977 > URL: https://issues.apache.org/jira/browse/FLINK-28977 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Michael >Priority: Major > Labels: pull-request-available, pull-requests-available > > HybridSource pipeline has an intermittent error when reading from s3, usually > this error is fixed when pipeline restarts after recovering from checkpoint. > But intermittently happens: > 2022/08/02 22:26:51.435 INFO o.a.f.runtime.jobmaster.JobMaster - Trying to > recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> > filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator > fd9fbc680ee884c4eafd0b9c2d3d007f). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393) > ... > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151) > at > org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70) > at java.lang.Thread.run(Thread.java:750) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-28817: - Fix Version/s: 1.15.2 > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Qishang Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.2 > > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579192#comment-17579192 ] Thomas Weise commented on FLINK-28817: -- [~Benenson] thank you for the thorough investigation! > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Qishang Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-28817: Assignee: Qishang Zhong > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Qishang Zhong >Priority: Major > Labels: pull-request-available > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-28817: - Fix Version/s: 1.16.0 > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Qishang Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-28817. -- Resolution: Fixed > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Assignee: Qishang Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579190#comment-17579190 ] Thomas Weise commented on FLINK-28817: -- [~nicholasjiang] I believe it does. Can you please verify and close FLINK-26938 if so? > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Priority: Major > Labels: pull-request-available > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578507#comment-17578507 ] Thomas Weise edited comment on FLINK-28817 at 8/11/22 2:29 PM: --- [~Benenson] thanks for investigating this issue. I think that has to do with the reader not having any restored splits (most likely because none were previously assigned) and therefore reporting -1 back to the enumerator. Let me check what the correct fix for this is. was (Author: thw): [~Benenson] thanks for investigating this issue. I think that has to do with the reader not having any restored splits (most likely because non were assigned) and therefore reporting -1 back to the enumerator. Let me check what the correct fix for this is. > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Priority: Major > Labels: pull-request-available > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578507#comment-17578507 ] Thomas Weise commented on FLINK-28817: -- [~Benenson] thanks for investigating this issue. I think that has to do with the reader not having any restored splits (most likely because non were assigned) and therefore reporting -1 back to the enumerator. Let me check what the correct fix for this is. > NullPointerException in HybridSource when restoring from checkpoint > --- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.14.4, 1.15.1 >Reporter: Michael >Priority: Major > Labels: pull-request-available > Attachments: Preconditions-checkNotNull-error.zip, > bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # HybridSourceSplitEnumerator.switchEnumerator failed with > com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed > out. This is intermittent error, it is usually fixed, when Flink recover from > checkpoint & repeat the operation. > # Flink starts recovering from checkpoint, > # HybridSourceSplitEnumerator receives > SourceReaderFinishedEvent\{sourceIndex=-1} > # Processing this event cause > 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught > exception in the SplitEnumerator for Source Source: hybrid-source while > handling operator event SourceEventWrapper[SourceReaderFinishedEvent > {sourceIndex=-1} > ] from subtask 6. Triggering job failover. > java.lang.NullPointerException: Source for index=0 is not available from > sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) > ... > I'm running my version of the Hybrid Sources with additional logging, so line > numbers & some names could be different from Flink Github. > My Observation: the problem is intermittent, sometimes it works ok, i.e. > SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my > log, it happens if my SourceFactory.create() is executed BEFORE > HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=-1}. > If HybridSourceSplitEnumerator - handleSourceEvent is executed before my > SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent > Preconditions-checkNotNull-error log from JobMgr is attached -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28722) Hybrid Source should use .equals() for Integer comparison
[ https://issues.apache.org/jira/browse/FLINK-28722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572148#comment-17572148 ] Thomas Weise commented on FLINK-28722: -- [~mason6345] please take a look at the PR in linked JIRA. > Hybrid Source should use .equals() for Integer comparison > - > > Key: FLINK-28722 > URL: https://issues.apache.org/jira/browse/FLINK-28722 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.1 >Reporter: Mason Chen >Priority: Major > Fix For: 1.16.0, 1.15.2 > > > HybridSource should use .equals() for Integer comparison in filtering out the > underlying sources. This causes the HybridSource to stop working when it hits > the 128th source (would not work for anything past 127 sources). > https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358 > > A user reported this issue here: > https://lists.apache.org/thread/7h2rblsdt7rjf85q9mhfht77bghtbswh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28722) Hybrid Source should use .equals() for Integer comparison
[ https://issues.apache.org/jira/browse/FLINK-28722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise closed FLINK-28722. Resolution: Duplicate > Hybrid Source should use .equals() for Integer comparison > - > > Key: FLINK-28722 > URL: https://issues.apache.org/jira/browse/FLINK-28722 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.1 >Reporter: Mason Chen >Priority: Major > Fix For: 1.16.0, 1.15.2 > > > HybridSource should use .equals() for Integer comparison in filtering out the > underlying sources. This causes the HybridSource to stop working when it hits > the 128th source (would not work for anything past 127 sources). > https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358 > > A user reported this issue here: > https://lists.apache.org/thread/7h2rblsdt7rjf85q9mhfht77bghtbswh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27650) First environment variable of top level pod template is lost
[ https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27650: - Fix Version/s: (was: kubernetes-operator-1.1.0) > First environment variable of top level pod template is lost > > > Key: FLINK-27650 > URL: https://issues.apache.org/jira/browse/FLINK-27650 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Simon Paradis >Priority: Major > Attachments: flink-27650.yaml > > > I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* > to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template > feature to inject environment variable to control structured JSON logging. > I noticed the first defined environment variable is never injected into the > JobManager nor TaskManager pods. The work around is to define a dummy env. > var. > Here's the manifest template. This gets processed by a tool that will first > expand ${ENV_VAR} reference with values provided by our CI pipeline. We > should not have to create the FLINK_COORDINATES_DUMMY env var. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28541) Add OwnerReferences to FlinkDeployment CR in jobmanager Deployment
[ https://issues.apache.org/jira/browse/FLINK-28541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566505#comment-17566505 ] Thomas Weise commented on FLINK-28541: -- See https://issues.apache.org/jira/browse/FLINK-26812 > Add OwnerReferences to FlinkDeployment CR in jobmanager Deployment > -- > > Key: FLINK-28541 > URL: https://issues.apache.org/jira/browse/FLINK-28541 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > > `ownerReferences` is used by Argo CD > (https://argo-cd.readthedocs.io/en/stable/) to display relation between > resources. Since there is no `ownerReferences` in jobmanager Deployment, Argo > CD cannot know this Deployment is created by FlinkDeployment CR. Thus Argo CD > cannot display full resources managed by FlinkDeployment CR. > Discuss thread in > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1657639397473729 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28364) Python Job support for Kubernetes Operator
[ https://issues.apache.org/jira/browse/FLINK-28364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563304#comment-17563304 ] Thomas Weise commented on FLINK-28364: -- Thanks for the ping [~gyfora]. This is something that can be solved nicely with a custom application image and does not require any change to the operator. I had done something similar in the past: https://github.com/lyft/flinkk8soperator/tree/master/examples/beam-python > Python Job support for Kubernetes Operator > -- > > Key: FLINK-28364 > URL: https://issues.apache.org/jira/browse/FLINK-28364 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: wu3396 >Priority: Major > > *Describe the solution* > Job types that I want to support pyflink for > *Describe alternatives* > like [here|https://github.com/spotify/flink-on-k8s-operator/pull/165] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor
[ https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17562716#comment-17562716 ] Thomas Weise commented on FLINK-27009: -- I would prefer if any sql processing logic remains separated from the operator. That would also mean a fat client that performs translation and requires additional dependencies should not execute in the operator. I think that can be accomplished by providing a separate entry point that executes in application mode and then the user can supply any other dependencies as needed as part of the image. Is that the plan? > Support SQL job submission in flink kubernetes opeartor > --- > > Key: FLINK-27009 > URL: https://issues.apache.org/jira/browse/FLINK-27009 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Biao Geng >Assignee: Gyula Fora >Priority: Major > > Currently, the flink kubernetes opeartor is for jar job using application or > session cluster. For SQL job, there is no out of box solution in the > operator. > One simple and short-term solution is to wrap the SQL script into a jar job > using table API with limitation. > The long-term solution may work with > [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve > the full support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-26891) Emit events for important Deployment / Job changes
[ https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17558113#comment-17558113 ] Thomas Weise edited comment on FLINK-26891 at 6/23/22 2:33 PM: --- This will add events that will provide more visibility to the user via k8s events. Examples: spec change detected, application submitted etc. was (Author: thw): This will add events that will provide more visibility to the user. Examples: spec change detected, application submitted etc. > Emit events for important Deployment / Job changes > -- > > Key: FLINK-26891 > URL: https://issues.apache.org/jira/browse/FLINK-26891 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Thomas Weise >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > We should try capturing the important deployment states, such as RUNNING, > FAILING, DEPLOYING -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-26891) Emit events for important Deployment / Job changes
[ https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17558113#comment-17558113 ] Thomas Weise commented on FLINK-26891: -- This will add events that will provide more visibility to the user. Examples: spec change detected, application submitted etc. > Emit events for important Deployment / Job changes > -- > > Key: FLINK-26891 > URL: https://issues.apache.org/jira/browse/FLINK-26891 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Thomas Weise >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > We should try capturing the important deployment states, such as RUNNING, > FAILING, DEPLOYING -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-26891) Emit events for important Deployment / Job changes
[ https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-26891: Assignee: Thomas Weise (was: Matyas Orhidi) > Emit events for important Deployment / Job changes > -- > > Key: FLINK-26891 > URL: https://issues.apache.org/jira/browse/FLINK-26891 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Thomas Weise >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > We should try capturing the important deployment states, such as RUNNING, > FAILING, DEPLOYING -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-27101: Assignee: Jiale Tan > Periodically break the chain of incremental checkpoint > -- > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Steven Zhen Wu >Assignee: Jiale Tan >Priority: Major > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to the > old files. Is there any RocksDB compation can achieve full compaction of > removing old delete markers. Recent delete markers are fine > [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-27788) Adding annotation to k8 operator Pod
[ https://issues.apache.org/jira/browse/FLINK-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-27788: Assignee: Jaganathan Asokan > Adding annotation to k8 operator Pod > > > Key: FLINK-27788 > URL: https://issues.apache.org/jira/browse/FLINK-27788 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0, kubernetes-operator-1.1.0 >Reporter: Jaganathan Asokan >Assignee: Jaganathan Asokan >Priority: Minor > > Currently we lack the option to natively add annotations on flink operator > pods. Providing this feature directly on our existing helm chart, could be > useful. One potential use-case for allowing annotations on Pod is to enable > scrapping of opertor metrics by monitoring Infrastructure like Prometheus , > Datadog etc. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27788) Adding annotation to k8 operator Pod
[ https://issues.apache.org/jira/browse/FLINK-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27788: - Fix Version/s: (was: kubernetes-operator-0.1.0) (was: kubernetes-operator-1.1.0) > Adding annotation to k8 operator Pod > > > Key: FLINK-27788 > URL: https://issues.apache.org/jira/browse/FLINK-27788 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0, kubernetes-operator-1.1.0 >Reporter: Jaganathan Asokan >Priority: Minor > > Currently we lack the option to natively add annotations on flink operator > pods. Providing this feature directly on our existing helm chart, could be > useful. One potential use-case for allowing annotations on Pod is to enable > scrapping of opertor metrics by monitoring Infrastructure like Prometheus , > Datadog etc. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27650) First environment variable of top level pod template is lost
[ https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537773#comment-17537773 ] Thomas Weise commented on FLINK-27650: -- That's as per [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L83] Maybe it is better to just override? This isn't a "blocker" though. > First environment variable of top level pod template is lost > > > Key: FLINK-27650 > URL: https://issues.apache.org/jira/browse/FLINK-27650 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Simon Paradis >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > Attachments: flink-27650.yaml > > > I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* > to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template > feature to inject environment variable to control structured JSON logging. > I noticed the first defined environment variable is never injected into the > JobManager nor TaskManager pods. The work around is to define a dummy env. > var. > Here's the manifest template. This gets processed by a tool that will first > expand ${ENV_VAR} reference with values provided by our CI pipeline. We > should not have to create the FLINK_COORDINATES_DUMMY env var. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27650) First environment variable of top level pod template is lost
[ https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27650: - Priority: Major (was: Blocker) > First environment variable of top level pod template is lost > > > Key: FLINK-27650 > URL: https://issues.apache.org/jira/browse/FLINK-27650 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Simon Paradis >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > Attachments: flink-27650.yaml > > > I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* > to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template > feature to inject environment variable to control structured JSON logging. > I noticed the first defined environment variable is never injected into the > JobManager nor TaskManager pods. The work around is to define a dummy env. > var. > Here's the manifest template. This gets processed by a tool that will first > expand ${ENV_VAR} reference with values provided by our CI pipeline. We > should not have to create the FLINK_COORDINATES_DUMMY env var. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27465) AvroRowDeserializationSchema.convertToTimestamp fails with negative nano seconds
[ https://issues.apache.org/jira/browse/FLINK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27465: - Fix Version/s: 1.15.1 > AvroRowDeserializationSchema.convertToTimestamp fails with negative nano > seconds > > > Key: FLINK-27465 > URL: https://issues.apache.org/jira/browse/FLINK-27465 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.15.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.1 > > > The issue is exposed due to time zone dependency in > AvroRowDeSerializationSchemaTest. > > The root cause is that convertToTimestamp attempts to set negative value with > java.sql.Timestamp.setNanos -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27255: - Fix Version/s: 1.15.1 > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27594) Only recover JM deployment if HA metadata available
[ https://issues.apache.org/jira/browse/FLINK-27594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17536266#comment-17536266 ] Thomas Weise commented on FLINK-27594: -- [~gyfora] are you saying that when no HA metadata is available and the upgrade mode is LAST_STATE then the operator should keep the deployment in error state? I think that would be correct. When the upgrade mode is SAVEPOINT, then it can go back to that savepoint? I also think that with LAST_STATE we should pick either last checkpoint or savepoint, whichever is more recent. > Only recover JM deployment if HA metadata available > --- > > Key: FLINK-27594 > URL: https://issues.apache.org/jira/browse/FLINK-27594 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > Fix For: kubernetes-operator-1.0.0 > > > This ticket is related to https://issues.apache.org/jira/browse/FLINK-27572 > The deployment recovery logic for list jobmanager deployments simply performs > a restoreFromLasteSavepoint operation currently. > This is incorrect in cases where the HA metadata is not available as it might > lead to accidentally restoring from an older state. > We should verify that HA metadata is present and simply perform a > deployOperation. Once we have this we can actually make the recovery default > true for all versions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17536222#comment-17536222 ] Thomas Weise commented on FLINK-27255: -- [~Keathalin21] please also open a PR for 1.15.x > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.14.5 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27255: - Fix Version/s: 1.14.5 > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.14.5 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-27255. -- Resolution: Fixed > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-27255: - Fix Version/s: 1.16.0 > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27435) Kubernetes Operator keeps savepoint history
[ https://issues.apache.org/jira/browse/FLINK-27435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534609#comment-17534609 ] Thomas Weise commented on FLINK-27435: -- [~gyfora] [~wangyang0918] I'm considering adding an optional List to either JobStatus or SavepointInfo and update that whenever a new savepoint is recorded in the observer. That would be complemented by purging savepoints based on time and count limit configuration parameters. WDTY? > Kubernetes Operator keeps savepoint history > --- > > Key: FLINK-27435 > URL: https://issues.apache.org/jira/browse/FLINK-27435 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > > Currently the operator keeps track of the most recent savepoint that was > triggered through savepointTriggerNonce. In some cases it is necessary to > find older savepoints. For that, it would be nice if the operator can > optionally maintain a savepoint history (and perhaps also trigger disposal of > savepoints that fall out of the history). The maximum number of savepoints > retained could be configured by cound and/or age. > -- This message was sent by Atlassian Jira (v8.20.7#820007)