[jira] [Closed] (FLINK-29385) AddColumn in flink table store should check the duplicate field names
[ https://issues.apache.org/jira/browse/FLINK-29385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-29385. Fix Version/s: table-store-0.3.0 table-store-0.2.1 Resolution: Fixed master: 98774161860055aef2113a5442ad63dcfe3ea9eb release-0.2: 9f0acad62c7a21a547cab1312b775baa0b6ab4e4 > AddColumn in flink table store should check the duplicate field names > - > > Key: FLINK-29385 > URL: https://issues.apache.org/jira/browse/FLINK-29385 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Shammon >Assignee: Shammon >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0, table-store-0.2.1 > > > AddColumn in table store should check the duplicate field names, otherwise > the ddl will be successful and create flink store table failed for flink job -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
JingsongLi merged PR #302: URL: https://github.com/apache/flink-table-store/pull/302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RyanSkraba commented on pull request #20805: [FLINK-29198][test] Fail after maximum RetryOnException
RyanSkraba commented on PR #20805: URL: https://github.com/apache/flink/pull/20805#issuecomment-1254602564 > Thanks for the contribution. I think it would be great to have a test here confirming that the change fixes the issue Thanks Sergey, especially for the collaboration! This was a tricky thing to test; it gets a bit unclear about how far to go when testing test code :D But since this is such a widely used TestExtension, it's good to have extra coverage -- especially when the symptom is otherwise silently skipping tests that should have failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29380) Two streams union, watermark error, not the minimum value
[ https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiechenling updated FLINK-29380: Attachment: screenshot-2.png > Two streams union, watermark error, not the minimum value > - > > Key: FLINK-29380 > URL: https://issues.apache.org/jira/browse/FLINK-29380 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0, 1.15.2 >Reporter: xiechenling >Priority: Blocker > Fix For: 1.16.0, 1.15.3 > > Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png, > screenshot-2.png > > > Two streams union, watermark error, not the minimum value, connect operator > watermark is true. > !image-2022-09-21-17-59-01-846.png! > This phenomenon feels related to watermark idle. In flink 1.13.1, watermark > is normal whether idle watermark is set or not. In flink 1.15.2, watermark is > normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. > !screenshot-1.png! > {code:scala} > import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, > WatermarkStrategy} > import org.apache.flink.api.connector.source.Source > import org.apache.flink.api.connector.source.lib.NumberSequenceSource > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.api.functions.ProcessFunction > import org.apache.flink.util.Collector > import java.time.format.DateTimeFormatter > import java.time.{Duration, Instant, ZoneId} > import java.util > object UnionWaterMarkTest { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new > Configuration) > env.setParallelism(2) > val numberSequenceSource: Source[Long, > NumberSequenceSource.NumberSequenceSplit, > util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new > NumberSequenceSource(0L, 1L) > .asInstanceOf[Source[Long, > NumberSequenceSource.NumberSequenceSplit, > util.Collection[NumberSequenceSource.NumberSequenceSplit]]] > val stream1 = env.fromSource(numberSequenceSource, > WatermarkStrategy > .forMonotonousTimestamps[Long]() > .withTimestampAssigner(new SerializableTimestampAssigner[Long] { > override def extractTimestamp(element: Long, recordTimestamp: > Long): Long = { > Instant.now().toEpochMilli > } > }), > "source" > ) > val idleMillis = 1L > val stream2 = env.fromSource(numberSequenceSource, > WatermarkStrategy > .forMonotonousTimestamps[Long]() > .withTimestampAssigner(new SerializableTimestampAssigner[Long] { > override def extractTimestamp(element: Long, recordTimestamp: > Long): Long = { > Instant.now().toEpochMilli - (1000L * 60L * 60L) > } > }) > .withIdleness(Duration.ofMillis(idleMillis)) > , > "source" > ) > stream1 > .process(new PrintWatermarkProcess("stream1")) > .returns(classOf[Long]) > .startNewChain() > .union( > stream2 > .process(new PrintWatermarkProcess("stream2")) > .returns(classOf[Long]) > .startNewChain() > .process(new PrintWatermarkProcess("stream3")) > .returns(classOf[Long]) > .startNewChain() > ) > .process(new PrintWatermarkProcess("union")) > .returns(classOf[Long]) > .filter(value => false) > .print() > env.execute() > } > } > class PrintWatermarkProcess(operatorName: String) extends > ProcessFunction[Long, Long] { > override def processElement(value: Long, ctx: ProcessFunction[Long, > Long]#Context, out: Collector[Long]): Unit = { > out.collect(value) > val watermark = ctx.timerService().currentWatermark() > if (watermark > 0 && watermark < 2L) { > Instant.ofEpochMilli(watermark) > val datetimeStr = > DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark)) > // println(operatorName + " " + datetimeStr) > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29380) Two streams union, watermark error, not the minimum value
[ https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiechenling updated FLINK-29380: Description: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is normal whether idle watermark is set or not. In flink 1.15.2, watermark is normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. !screenshot-1.png! !screenshot-2.png! {code:scala} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.connector.source.Source import org.apache.flink.api.connector.source.lib.NumberSequenceSource import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import java.time.format.DateTimeFormatter import java.time.{Duration, Instant, ZoneId} import java.util object UnionWaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration) env.setParallelism(2) val numberSequenceSource: Source[Long, NumberSequenceSource.NumberSequenceSplit, util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new NumberSequenceSource(0L, 1L) .asInstanceOf[Source[Long, NumberSequenceSource.NumberSequenceSplit, util.Collection[NumberSequenceSource.NumberSequenceSplit]]] val stream1 = env.fromSource(numberSequenceSource, WatermarkStrategy .forMonotonousTimestamps[Long]() .withTimestampAssigner(new SerializableTimestampAssigner[Long] { override def extractTimestamp(element: Long, recordTimestamp: Long): Long = { Instant.now().toEpochMilli } }), "source" ) val idleMillis = 1L val stream2 = env.fromSource(numberSequenceSource, WatermarkStrategy .forMonotonousTimestamps[Long]() .withTimestampAssigner(new SerializableTimestampAssigner[Long] { override def extractTimestamp(element: Long, recordTimestamp: Long): Long = { Instant.now().toEpochMilli - (1000L * 60L * 60L) } }) .withIdleness(Duration.ofMillis(idleMillis)) , "source" ) stream1 .process(new PrintWatermarkProcess("stream1")) .returns(classOf[Long]) .startNewChain() .union( stream2 .process(new PrintWatermarkProcess("stream2")) .returns(classOf[Long]) .startNewChain() .process(new PrintWatermarkProcess("stream3")) .returns(classOf[Long]) .startNewChain() ) .process(new PrintWatermarkProcess("union")) .returns(classOf[Long]) .filter(value => false) .print() env.execute() } } class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long, Long] { override def processElement(value: Long, ctx: ProcessFunction[Long, Long]#Context, out: Collector[Long]): Unit = { out.collect(value) val watermark = ctx.timerService().currentWatermark() if (watermark > 0 && watermark < 2L) { Instant.ofEpochMilli(watermark) val datetimeStr = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark)) // println(operatorName + " " + datetimeStr) } } } {code} was: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is normal whether idle watermark is set or not. In flink 1.15.2, watermark is normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. !screenshot-1.png! {code:scala} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.connector.source.Source import org.apache.flink.api.connector.source.lib.NumberSequenceSource import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import java.time.format.DateTimeFormatter import java.time.{Duration, Instant, ZoneId} import java.util object UnionWaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration) env.setParallelism(2) val numberSequenceSource: Source[Long, NumberSequenceSource.NumberSequenceSplit, util.Collection[NumberSequen
[jira] [Updated] (FLINK-29380) Two streams union, watermark error, not the minimum value
[ https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiechenling updated FLINK-29380: Description: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is normal whether idle watermark is set or not. In flink 1.15.2, watermark is normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. !screenshot-1.png! {code:scala} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.connector.source.Source import org.apache.flink.api.connector.source.lib.NumberSequenceSource import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import java.time.format.DateTimeFormatter import java.time.{Duration, Instant, ZoneId} import java.util object UnionWaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration) env.setParallelism(2) val numberSequenceSource: Source[Long, NumberSequenceSource.NumberSequenceSplit, util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new NumberSequenceSource(0L, 1L) .asInstanceOf[Source[Long, NumberSequenceSource.NumberSequenceSplit, util.Collection[NumberSequenceSource.NumberSequenceSplit]]] val stream1 = env.fromSource(numberSequenceSource, WatermarkStrategy .forMonotonousTimestamps[Long]() .withTimestampAssigner(new SerializableTimestampAssigner[Long] { override def extractTimestamp(element: Long, recordTimestamp: Long): Long = { Instant.now().toEpochMilli } }), "source" ) val idleMillis = 1L val stream2 = env.fromSource(numberSequenceSource, WatermarkStrategy .forMonotonousTimestamps[Long]() .withTimestampAssigner(new SerializableTimestampAssigner[Long] { override def extractTimestamp(element: Long, recordTimestamp: Long): Long = { Instant.now().toEpochMilli - (1000L * 60L * 60L) } }) .withIdleness(Duration.ofMillis(idleMillis)) , "source" ) stream1 .process(new PrintWatermarkProcess("stream1")) .returns(classOf[Long]) .startNewChain() .union( stream2 .process(new PrintWatermarkProcess("stream2")) .returns(classOf[Long]) .startNewChain() .process(new PrintWatermarkProcess("stream3")) .returns(classOf[Long]) .startNewChain() ) .process(new PrintWatermarkProcess("union")) .returns(classOf[Long]) .filter(value => false) .print() env.execute() } } class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long, Long] { override def processElement(value: Long, ctx: ProcessFunction[Long, Long]#Context, out: Collector[Long]): Unit = { out.collect(value) val watermark = ctx.timerService().currentWatermark() if (watermark > 0 && watermark < 2L) { Instant.ofEpochMilli(watermark) val datetimeStr = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark)) // println(operatorName + " " + datetimeStr) } } } {code} was: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is normal whether idle watermark is set or not. In flink 1.15.2, watermark is normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. !screenshot-1.png! > Two streams union, watermark error, not the minimum value > - > > Key: FLINK-29380 > URL: https://issues.apache.org/jira/browse/FLINK-29380 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0, 1.15.2 >Reporter: xiechenling >Priority: Blocker > Fix For: 1.16.0, 1.15.3 > > Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png > > > Two streams union, watermark error, not the minimum value, connect operator > watermark is true. > !image-2022-09-21-17-59-01-846.png! > This phenomenon feels related to watermark idle. In flink 1.13.1, watermark > is normal whether idle watermark is set or not. In flink 1.15.2, watermark is > normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. > !screenshot-1.pn
[GitHub] [flink-table-store] zjureel commented on pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
zjureel commented on PR #302: URL: https://github.com/apache/flink-table-store/pull/302#issuecomment-1254600956 Thanks @JingsongLi @SteNicholas I have updated the codes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29380) Two streams union, watermark error, not the minimum value
[ https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiechenling updated FLINK-29380: Description: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is normal whether idle watermark is set or not. In flink 1.15.2, watermark is normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. !screenshot-1.png! was: Two streams union, watermark error, not the minimum value, connect operator watermark is true. !image-2022-09-21-17-59-01-846.png! > Two streams union, watermark error, not the minimum value > - > > Key: FLINK-29380 > URL: https://issues.apache.org/jira/browse/FLINK-29380 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0, 1.15.2 >Reporter: xiechenling >Priority: Blocker > Fix For: 1.16.0, 1.15.3 > > Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png > > > Two streams union, watermark error, not the minimum value, connect operator > watermark is true. > !image-2022-09-21-17-59-01-846.png! > This phenomenon feels related to watermark idle. In flink 1.13.1, watermark > is normal whether idle watermark is set or not. In flink 1.15.2, watermark is > normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong. > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29380) Two streams union, watermark error, not the minimum value
[ https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiechenling updated FLINK-29380: Attachment: screenshot-1.png > Two streams union, watermark error, not the minimum value > - > > Key: FLINK-29380 > URL: https://issues.apache.org/jira/browse/FLINK-29380 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0, 1.15.2 >Reporter: xiechenling >Priority: Blocker > Fix For: 1.16.0, 1.15.3 > > Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png > > > Two streams union, watermark error, not the minimum value, connect operator > watermark is true. > !image-2022-09-21-17-59-01-846.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29349) Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
[ https://issues.apache.org/jira/browse/FLINK-29349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608100#comment-17608100 ] lincoln lee commented on FLINK-29349: - [~lsy] I took a look at the FLINK-22956, there did exist overlaps, jingsong has tried to remove timers for all over agg functions, but it looks like most of the operators have semantic dependencies and the pr was closed. Btw, FLINK-22956 seems not strongly related to the umbrella ticket which aimed for 'Supports change log inputs for event time operators', [~lzljs3620320] do you have a plan for continuing FLINK-22956? If not, do we just keep this new issue (only modify the proctime unbounded over agg operator) and close the former one(FLINK-22956)? > Use state ttl instead of timer to clean up state in proctime unbounded over > aggregate > - > > Key: FLINK-29349 > URL: https://issues.apache.org/jira/browse/FLINK-29349 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.0, 1.15.2 >Reporter: lincoln lee >Priority: Major > Fix For: 1.17.0 > > > Currently we rely on the timer based state cleaning in proctime over > aggregate, this can be optimized to use state ttl for a more efficienct way -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] zjureel commented on a diff in pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
zjureel commented on code in PR #302: URL: https://github.com/apache/flink-table-store/pull/302#discussion_r977248865 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -173,6 +174,15 @@ public TableSchema commitChanges(List changes) throws Exception { newOptions.remove(removeOption.key()); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; +Set fieldNames = Review Comment: > Why not directly filter the field name of add column in the stream of `newFields`? It's an interesting question. Maybe that the data types of these two columns are different, so i think it's better to return the error message to the user -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #301: [FLINK-29367] FileStoreCommitImpl also checks for conflicts when committing append changes
JingsongLi commented on code in PR #301: URL: https://github.com/apache/flink-table-store/pull/301#discussion_r977245473 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java: ## @@ -182,26 +187,61 @@ public void commit(ManifestCommittable committable, Map properti LOG.debug("Ready to commit\n" + committable.toString()); } +Long safeLatestSnapshotId = null; +List entriesToCheck = new ArrayList<>(); + List appendChanges = collectChanges(committable.newFiles(), FileKind.ADD); +List compactChanges = new ArrayList<>(); +compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE)); +compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD)); + if (createEmptyCommit || !appendChanges.isEmpty()) { +// Optimization for common path. +// Step 1: +// Read manifest entries from changed partitions here and check for conflicts. +// If there are no other jobs committing at the same time, +// we can skip conflict checking in tryCommit method. +// This optimization is mainly used to decrease the number of times we read from files. +Long latestSnapshotId = snapshotManager.latestSnapshotId(); +if (latestSnapshotId != null) { +// it is possible that some partitions only have compact changes, +// so we need to contain all changes +entriesToCheck.addAll( +readAllEntriesFromChangedPartitions( +latestSnapshotId, appendChanges, compactChanges)); +entriesToCheck.addAll(appendChanges); +noConflictsOrFail(entriesToCheck); Review Comment: It is better to explain in the exception: When there will be conflict and what should be done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #301: [FLINK-29367] FileStoreCommitImpl also checks for conflicts when committing append changes
JingsongLi commented on code in PR #301: URL: https://github.com/apache/flink-table-store/pull/301#discussion_r977242850 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java: ## @@ -479,32 +526,39 @@ private boolean tryCommitOnce( return false; } -private void noConflictsOrFail(long snapshotId, List changes) { +@SafeVarargs +private final List readAllEntriesFromChangedPartitions( +long snapshotId, List... changes) { List changedPartitions = -changes.stream() +Arrays.stream(changes) +.flatMap(Collection::stream) .map(ManifestEntry::partition) .distinct() .collect(Collectors.toList()); -List allEntries; try { -allEntries = -new ArrayList<>( -scan.withSnapshot(snapshotId) -.withPartitionFilter(changedPartitions) -.plan() -.files()); +return scan.withSnapshot(snapshotId) +.withPartitionFilter(changedPartitions) +.plan() +.files(); } catch (Throwable e) { -throw new RuntimeException("Cannot determine if conflicts exist.", e); +throw new RuntimeException("Cannot read manifest entries from changed partitions.", e); } +} + +private void noConflictsOrFail(long snapshotId, List changes) { +List allEntries = +new ArrayList<>(readAllEntriesFromChangedPartitions(snapshotId, changes)); allEntries.addAll(changes); +noConflictsOrFail(allEntries); +} +private void noConflictsOrFail(List allEntries) { Collection mergedEntries; try { // merge manifest entries and also check if the files we want to delete are still there mergedEntries = ManifestEntry.mergeManifestEntries(allEntries); } catch (Throwable e) { -throw new RuntimeException( -"File deletion conflicts detected! Give up committing compact changes.", e); +throw new RuntimeException("File deletion conflicts detected! Give up committing.", e); Review Comment: I think we can ignore exception from `mergeManifestEntries` here. We can explain there are concurrent writing and print to change detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #20246: [FLINK-28074][table-planner] show statistics details for DESCRIBE EXTENDED
lsyldliu commented on PR #20246: URL: https://github.com/apache/flink/pull/20246#issuecomment-1254579074 Thanks for your contribution, I will take a look as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on pull request #20867: [FLINK-29372] Add suffix to all options that conflict with YAML
wanglijie95 commented on PR #20867: URL: https://github.com/apache/flink/pull/20867#issuecomment-1254576835 Following configuration options are missing: `kubernetes.jobmanager.cpu` `kubernetes.jobmanager.cpu.limit-factor` `kubernetes.taskmanager.cpu` `kubernetes.taskmanager.cpu.limit-factor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20879: Using a new image to debug CI
flinkbot commented on PR #20879: URL: https://github.com/apache/flink/pull/20879#issuecomment-1254570892 ## CI report: * 2d13d9a1ee7e775bed2a47d4037159e587efc090 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
JingsongLi commented on code in PR #302: URL: https://github.com/apache/flink-table-store/pull/302#discussion_r977230937 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -173,6 +174,15 @@ public TableSchema commitChanges(List changes) throws Exception { newOptions.remove(removeOption.key()); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; +Set fieldNames = Review Comment: `Stream.anyMatch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
SteNicholas commented on code in PR #302: URL: https://github.com/apache/flink-table-store/pull/302#discussion_r977219669 ## flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java: ## @@ -120,6 +122,28 @@ public void testAddField() throws Exception { assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L)); } +@Test +public void testAddDuplicateField() throws Exception { +UpdateSchema updateSchema = +new UpdateSchema( +RowType.of(new IntType(), new BigIntType()), +Collections.emptyList(), +Collections.emptyList(), +new HashMap<>(), +""); +schemaManager.commitNewVersion(updateSchema); +schemaManager.commitChanges( +Collections.singletonList(SchemaChange.addColumn("f3", new BigIntType(; +assertThatThrownBy( +() -> { +schemaManager.commitChanges( +Collections.singletonList( +SchemaChange.addColumn("f3", new FloatType(; +}) +.isInstanceOf(IllegalArgumentException.class) +.hasMessageStartingWith("The column[f3] is exist in"); Review Comment: Why not use `hasMessage` to verify the complete exception message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
SteNicholas commented on code in PR #302: URL: https://github.com/apache/flink-table-store/pull/302#discussion_r977218529 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -173,6 +174,15 @@ public TableSchema commitChanges(List changes) throws Exception { newOptions.remove(removeOption.key()); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; +Set fieldNames = + newFields.stream().map(DataField::name).collect(Collectors.toSet()); +if (fieldNames.contains(addColumn.fieldName())) { +throw new IllegalArgumentException( +"The column[" Review Comment: ```suggestion String.format("The column [%s] exists in the table[%s].".format(addColumn.fieldName(), tableRoot)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #302: [FLINK-29385] Verify duplicate column names for AddColumn
SteNicholas commented on code in PR #302: URL: https://github.com/apache/flink-table-store/pull/302#discussion_r977217137 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -173,6 +174,15 @@ public TableSchema commitChanges(List changes) throws Exception { newOptions.remove(removeOption.key()); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; +Set fieldNames = Review Comment: Why not directly filter the field name of add column in the stream of `newFields`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens commented on a diff in pull request #370: [FLINK-29288] Make it possible to use job jars in the system classpath
sap1ens commented on code in PR #370: URL: https://github.com/apache/flink-kubernetes-operator/pull/370#discussion_r977209625 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java: ## @@ -180,10 +180,6 @@ private Optional validateJobSpec( return Optional.empty(); } -if (StringUtils.isNullOrWhitespaceOnly(job.getJarURI())) { -return Optional.of("Jar URI must be defined"); -} Review Comment: Hey @jeesmon, I've tweaked `AbstractFlinkService` to pass an empty noop jar, however now I'm seeing an exception on the JobManager side: ``` Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.examples.statemachine.StateMachineExample at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:68) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:52) ~[flink-dist-1.15.2.jar:1.15.2] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist-1.15.2.jar:1.15.2] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:479) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:153) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:182) ~[flink-dist-1.15.2.jar:1.15.2] ``` My job config is the following: ``` apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: basic-session-job-only-example-noop2 spec: deploymentName: basic-session-deployment-only-example job: entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample parallelism: 4 upgradeMode: stateless ``` I've manually copied the jar file with the StateMachineExample class to the /opt/flink/lib folder. After analyzing the code it looks like Flink creates and utilizes a special classloader that only uses the uploaded jar and nothing else. Of course, it can't find anything since the jar is empty. What do I miss? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-13703) AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)
[ https://issues.apache.org/jira/browse/FLINK-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608063#comment-17608063 ] Andrei Leib commented on FLINK-13703: - Just hit this issue as well when setting {code:java} avroOptionalGetters := true {code} in build.sbt Looks like this is where it might be: {code:java} https://github.com/apache/flink/blob/b5cd9f34ab73fa69a3db5a09908c1aa954ed0597/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java#L2010-L2012{code} Will have a closer look when time permits. > AvroTypeInfo requires objects to be strict POJOs (mutable, with setters) > > > Key: FLINK-13703 > URL: https://issues.apache.org/jira/browse/FLINK-13703 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Alexander Fedulov >Priority: Minor > > There exists an option to generate Avro sources which would represent > immutable objects (`createSetters` option set to false) > [\[1\]|https://github.com/commercehub-oss/gradle-avro-plugin] , > [\[2\]|https://avro.apache.org/docs/current/api/java/org/apache/avro/mojo/AbstractAvroMojo.html]. > Those objects still have full arguments constructors and are being correctly > dealt with by Avro. > `AvroTypeInfo` in Flink performs a check to verify if a Class complies to > the strict POJO requirements (including setters) and throws an > IllegalStateException("Expecting type to be a PojoTypeInfo") otherwise. Can > this check be relaxed to provide better immutability support? > +Steps to reproduce:+ > 1) Generate Avro sources from schema using `createSetters` option. > 2) Use generated class in > `ConfluentRegistryAvroDeserializationSchema.forSpecific(GeneratedClass.class, > schemaRegistryUrl)` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20878: [FLINK-29228][hive] Align the schema of the HiveServer2 getMetadata with JDBC
flinkbot commented on PR #20878: URL: https://github.com/apache/flink/pull/20878#issuecomment-1254480559 ## CI report: * 4a7c35a242f47b6345a2ab25bb1480741a9a2ee4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #157: [FLINK-28906] Support windowing in AgglomerativeClustering
lindong28 commented on code in PR #157: URL: https://github.com/apache/flink-ml/pull/157#discussion_r977119140 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ Review Comment: nits: Utility class for operations related to {@link Windows} ## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ +@Internal +@SuppressWarnings({"rawtypes", "unchecked"}) +public class WindowUtils { +/** + * Applies windowAll() and process() operation on the input stream. + * + * @param input The input data stream. + * @param windows The window that defines how input data would be sliced into batches. + * @param function The user defined process function. + */ +public static SingleOutputStreamOperator windowAllProcess( +DataStream input, Windows windows, ProcessAllWindowFunction function) { +AllWindowedStream allWindowedStream; +if (windows == null) { +allWindowedStream = input.windowAll((WindowAssigner) EndOfStreamWindows.get()); +} else if (windows instanceof CountTumblingWindows) { +long countWindowSize = ((CountTumblingWindows) wind
[jira] [Assigned] (FLINK-29385) AddColumn in flink table store should check the duplicate field names
[ https://issues.apache.org/jira/browse/FLINK-29385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-29385: Assignee: Shammon > AddColumn in flink table store should check the duplicate field names > - > > Key: FLINK-29385 > URL: https://issues.apache.org/jira/browse/FLINK-29385 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Shammon >Assignee: Shammon >Priority: Major > Labels: pull-request-available > > AddColumn in table store should check the duplicate field names, otherwise > the ddl will be successful and create flink store table failed for flink job -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 opened a new pull request, #20878: [FLINK-29228][hive] Align the schema of the HiveServer2 getMetadata with JDBC
fsk119 opened a new pull request, #20878: URL: https://github.com/apache/flink/pull/20878 Cherry pick fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29385) AddColumn in flink table store should check the duplicate field names
[ https://issues.apache.org/jira/browse/FLINK-29385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29385: --- Labels: pull-request-available (was: ) > AddColumn in flink table store should check the duplicate field names > - > > Key: FLINK-29385 > URL: https://issues.apache.org/jira/browse/FLINK-29385 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Shammon >Priority: Major > Labels: pull-request-available > > AddColumn in table store should check the duplicate field names, otherwise > the ddl will be successful and create flink store table failed for flink job -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] zjureel opened a new pull request, #302: [FLINK-29385] Verify duplicate column names for AddColumn
zjureel opened a new pull request, #302: URL: https://github.com/apache/flink-table-store/pull/302 When we execute AddColumn ddl, we should check whether the column name is exist. Otherwise, the ddl will be successful and the job reads this table will create source of flink store table failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 closed pull request #20790: [FLINK-29228][hive] Align the schema of the HiveServer2 getMetadata with JDBC
fsk119 closed pull request #20790: [FLINK-29228][hive] Align the schema of the HiveServer2 getMetadata with JDBC URL: https://github.com/apache/flink/pull/20790 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29349) Use state ttl instead of timer to clean up state in proctime unbounded over aggregate
[ https://issues.apache.org/jira/browse/FLINK-29349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608056#comment-17608056 ] dalongliu commented on FLINK-29349: --- This is duplicated with FLINK-22956? > Use state ttl instead of timer to clean up state in proctime unbounded over > aggregate > - > > Key: FLINK-29349 > URL: https://issues.apache.org/jira/browse/FLINK-29349 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.0, 1.15.2 >Reporter: lincoln lee >Priority: Major > Fix For: 1.17.0 > > > Currently we rely on the timer based state cleaning in proctime over > aggregate, this can be optimized to use state ttl for a more efficienct way -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-6573) Flink MongoDB Connector
[ https://issues.apache.org/jira/browse/FLINK-6573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608051#comment-17608051 ] Leonard Xu commented on FLINK-6573: --- I've assigned this ticket to [~jiabao.sun] as he has start the great work. > Flink MongoDB Connector > --- > > Key: FLINK-6573 > URL: https://issues.apache.org/jira/browse/FLINK-6573 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Affects Versions: 1.2.0 > Environment: Linux Operating System, Mongo DB >Reporter: Nagamallikarjuna >Assignee: Jiabao Sun >Priority: Not a Priority > Labels: pull-request-available, stale-assigned > Attachments: image-2021-11-15-14-41-07-514.png > > Original Estimate: 672h > Remaining Estimate: 672h > > Hi Community, > Currently we are using Flink in the current Project. We have huge amount of > data to process using Flink which resides in Mongo DB. We have a requirement > of parallel data connectivity in between Flink and Mongo DB for both > reads/writes. Currently we are planning to create this connector and > contribute to the Community. > I will update the further details once I receive your feedback > Please let us know if you have any concerns. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-6573) Flink MongoDB Connector
[ https://issues.apache.org/jira/browse/FLINK-6573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-6573: - Assignee: Jiabao Sun (was: ZhuoYu Chen) > Flink MongoDB Connector > --- > > Key: FLINK-6573 > URL: https://issues.apache.org/jira/browse/FLINK-6573 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Affects Versions: 1.2.0 > Environment: Linux Operating System, Mongo DB >Reporter: Nagamallikarjuna >Assignee: Jiabao Sun >Priority: Not a Priority > Labels: pull-request-available, stale-assigned > Attachments: image-2021-11-15-14-41-07-514.png > > Original Estimate: 672h > Remaining Estimate: 672h > > Hi Community, > Currently we are using Flink in the current Project. We have huge amount of > data to process using Flink which resides in Mongo DB. We have a requirement > of parallel data connectivity in between Flink and Mongo DB for both > reads/writes. Currently we are planning to create this connector and > contribute to the Community. > I will update the further details once I receive your feedback > Please let us know if you have any concerns. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606485#comment-17606485 ] Jiale Tan edited comment on FLINK-27101 at 9/22/22 3:12 AM: Hi folks, I got [this|https://github.com/apache/flink/pull/20852] draft PR for option 3 as discussed above: ??Expose triggering checkpoint via CLI and/or REST API with some parameters to choose incremental/full checkpoint.?? The API and implementation is very similar to save point trigger. I am new to contributing to flink, please let me know if I am in the right direction. If needed, may start a small FLIP / dev mailing list discussion was (Author: JIRAUSER290356): Hi folks, I got [this|https://github.com/apache/flink/pull/20852] draft PR for option 3 as discussed above: ??Expose triggering checkpoint via CLI and/or REST API with some parameters to choose incremental/full checkpoint.?? The API and implementation is very similar to save point trigger. I am new to contributing to flink, please let me know if I am in the right direction. If yes, may start a small FLIP / dev mailing list discussion > Periodically break the chain of incremental checkpoint (trigger checkpoints > via REST API) > - > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Steven Zhen Wu >Assignee: Jiale Tan >Priority: Major > Labels: pull-request-available > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references t
[jira] [Comment Edited] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606485#comment-17606485 ] Jiale Tan edited comment on FLINK-27101 at 9/22/22 3:11 AM: Hi folks, I got [this|https://github.com/apache/flink/pull/20852] draft PR for option 3 as discussed above: ??Expose triggering checkpoint via CLI and/or REST API with some parameters to choose incremental/full checkpoint.?? The API and implementation is very similar to save point trigger. I am new to contributing to flink, please let me know if I am in the right direction. If yes, may start a small FLIP / dev mailing list discussion was (Author: JIRAUSER290356): Hi folks, I got [this|https://github.com/apache/flink/pull/20852] draft PR for option 3 as discussed above: ??Expose triggering checkpoint via CLI and/or REST API with some parameters to choose incremental/full checkpoint.?? The API and implementation is very similar to save point trigger. I am new to contributing to flink, please let me know if I am in the right direction. If yes, will start a small FLIP / dev mailing list discussion > Periodically break the chain of incremental checkpoint (trigger checkpoints > via REST API) > - > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Steven Zhen Wu >Assignee: Jiale Tan >Priority: Major > Labels: pull-request-available > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to
[jira] [Commented] (FLINK-29384) snakeyaml version 1.30 in flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar has vulnerabilities
[ https://issues.apache.org/jira/browse/FLINK-29384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608044#comment-17608044 ] Yang Wang commented on FLINK-29384: --- The {{org.yaml:snakeyaml:jar:1.30}} is introduces by {{{}io.fabric8:kubernetes-client:jar:5.12.3{}}}. I think we could use the dependencyManagement to pin the version to 1.32, just like what we have done for {{{}com.fasterxml.jackson{}}}. > snakeyaml version 1.30 in flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar > has vulnerabilities > --- > > Key: FLINK-29384 > URL: https://issues.apache.org/jira/browse/FLINK-29384 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: James Busche >Priority: Major > > I did a twistlock scan of the current operator image from main, and it looks > good except for in the flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar, I'm > seeing 5 CVEs on snakeyaml. Looks like updating from 1.30 to 1.32 should fix > it, but I'm not sure how to bump that up, other than the > [NOTICES|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE#L65] > entry. > The 5 CVEs are: > [https://nvd.nist.gov/vuln/detail/CVE-2022-25857] > [https://nvd.nist.gov/vuln/detail/CVE-2022-25857] > [https://nvd.nist.gov/vuln/detail/CVE-2022-38751] > [https://nvd.nist.gov/vuln/detail/CVE-2022-38750] > [https://nvd.nist.gov/vuln/detail/CVE-2022-38752] > Resulting in 1 High (CVSS 7.5) and 4 Mediums (CVSS 6.5, 6.5, 5.5, 4) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leletan commented on pull request #20852: [FLINK-27101][checkpointing][rest] Add restful API to trigger checkpoints
leletan commented on PR #20852: URL: https://github.com/apache/flink/pull/20852#issuecomment-1254461989 @pnowojski @zentol Addressed the code comments (at least I thought I did) and generated the docs. Please take a look whenever you have time. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29385) AddColumn in flink table store should check the duplicate field names
Shammon created FLINK-29385: --- Summary: AddColumn in flink table store should check the duplicate field names Key: FLINK-29385 URL: https://issues.apache.org/jira/browse/FLINK-29385 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.3.0 Reporter: Shammon AddColumn in table store should check the duplicate field names, otherwise the ddl will be successful and create flink store table failed for flink job -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on a diff in pull request #20790: [FLINK-29228][hive] Align the schema of the HiveServer2 getMetadata with JDBC
fsk119 commented on code in PR #20790: URL: https://github.com/apache/flink/pull/20790#discussion_r977139059 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java: ## @@ -572,6 +595,107 @@ public void testGetInfo() throws Exception { } } +@Test +public void testExecuteStatementInSyncMode() throws Exception { +TCLIService.Client client = createClient(); +TSessionHandle sessionHandle = client.OpenSession(new TOpenSessionReq()).getSessionHandle(); +TOperationHandle operationHandle = +client.ExecuteStatement(new TExecuteStatementReq(sessionHandle, "SHOW CATALOGS")) +.getOperationHandle(); + +assertThat( +client.GetOperationStatus(new TGetOperationStatusReq(operationHandle)) +.getOperationState()) +.isEqualTo(TOperationState.FINISHED_STATE); + +RowSet rowSet = +RowSetFactory.create( +client.FetchResults( +new TFetchResultsReq( +operationHandle, +TFetchOrientation.FETCH_NEXT, +Integer.MAX_VALUE)) +.getResults(), +HIVE_CLI_SERVICE_PROTOCOL_V10); +Iterator iterator = rowSet.iterator(); +List> actual = new ArrayList<>(); +while (iterator.hasNext()) { +actual.add(new ArrayList<>(Arrays.asList(iterator.next(; +} + assertThat(actual).isEqualTo(Collections.singletonList(Collections.singletonList("hive"))); +} Review Comment: I will fix in the next fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Sxnan commented on pull request #20862: [FLINK-29339][runtime] JobMasterPartitionTracker returns future to avoid blocking main thread
Sxnan commented on PR #20862: URL: https://github.com/apache/flink/pull/20862#issuecomment-125163 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29329) Checkpoint can not be triggered if encountering OOM
[ https://issues.apache.org/jira/browse/FLINK-29329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608037#comment-17608037 ] Yun Tang commented on FLINK-29329: -- I think the problem of not triggering the checkpoints anymore should be related to the [schedule timer|https://github.com/apache/flink/blob/b5cd9f34ab73fa69a3db5a09908c1aa954ed0597/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L168]. If you could reproduce this problem, I think you could use jmap dump the job manager to see what happened to CheckpointCoordinator#timer. > Checkpoint can not be triggered if encountering OOM > --- > > Key: FLINK-29329 > URL: https://issues.apache.org/jira/browse/FLINK-29329 > Project: Flink > Issue Type: Bug >Reporter: Yuxin Tan >Priority: Major > Fix For: 1.13.7 > > Attachments: job-exceptions-1.txt > > > When writing a checkpoint, an OOM error is thrown. But the JM is not failed > and is restored because I found a log "No master state to restore". > Then JM never makes checkpoints anymore. Currently, the root cause is not > that clear, maybe this is a bug and we should deal with the OOM or other > exceptions when making checkpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29367) Avoid manifest corruption for incorrect checkpoint recovery
[ https://issues.apache.org/jira/browse/FLINK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29367: --- Labels: pull-request-available (was: ) > Avoid manifest corruption for incorrect checkpoint recovery > --- > > Key: FLINK-29367 > URL: https://issues.apache.org/jira/browse/FLINK-29367 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jingsong Lee >Assignee: Caizhi Weng >Priority: Blocker > Labels: pull-request-available > Fix For: table-store-0.3.0, table-store-0.2.1 > > > When the job runs to checkpoint N, if the user recovers from an old > checkpoint (such as checkpoint N-5), the sink of the current FTS will cause a > manifest corruption because duplicate files may be committed. > We should avoid such corruption, and the storage should be robust enough. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #301: [FLINK-29367] FileStoreCommitImpl also checks for conflicts when committing append changes
tsreaper opened a new pull request, #301: URL: https://github.com/apache/flink-table-store/pull/301 When the job runs to checkpoint N, if the user recovers from an old checkpoint (such as checkpoint N-5), the sink of the current FTS will cause a manifest corruption because duplicate files may be committed. We should avoid such corruption, and the storage should be robust enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a diff in pull request #20653: [FLINK-29020][docs] add document for CTAS feature
wuchong commented on code in PR #20653: URL: https://github.com/apache/flink/pull/20653#discussion_r976628561 ## docs/content/docs/dev/table/sql/create.md: ## @@ -513,6 +516,48 @@ If you provide no like options, `INCLUDING ALL OVERWRITING OPTIONS` will be used **NOTE** The `source_table` can be a compound identifier. Thus, it can be a table from a different catalog or database: e.g. `my_catalog.my_db.MyTable` specifies table `MyTable` from catalog `MyCatalog` and database `my_db`; `my_db.MyTable` specifies table `MyTable` from current catalog and database `my_db`. +### `AS` + +Tables can also be created and populated by the results of a query in one create-table-as-select (CTAS) statement. CTAS is the simplest and fastest way to create and insert data into a table with a single command. + +There are two parts in CTAS, the SELECT part can be any [SELECT query]({{< ref "docs/dev/table/sql/queries/overview" >}}) supported by Flink SQL. The CREATE part takes the resulting schema from the SELECT part and creates the target table. Similar to `CREATE TABLE`, CTAS requires the required options of the target table must be specified in WITH clause. + +Creating the target table of CTAS depends on Catalog, so if using the built-in memory Catalog, users must ensure that the table already exists in external storage. If using other catalogs such as hive Catalog, the target table will be created by Catalog automatically. Review Comment: ```suggestion The creating table operation of CTAS depends on the target Catalog. For example, Hive Catalog creates the physical table in Hive automatically. But the in-memory catalog registers the table metadata in the client's memory where the SQL is executed. ``` How about modifying it like this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #20869: [FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client's execution
lsyldliu commented on PR #20869: URL: https://github.com/apache/flink/pull/20869#issuecomment-1254386573 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20877: [hotfix] Make doc ide_setup.md format better for code formatting
flinkbot commented on PR #20877: URL: https://github.com/apache/flink/pull/20877#issuecomment-1254372949 ## CI report: * acffaacb43a79c5a22771e076e52af94bfa7b94c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuml07 opened a new pull request, #20877: [hotfix] Make doc ide_setup.md format better for code formatting
liuml07 opened a new pull request, #20877: URL: https://github.com/apache/flink/pull/20877 ## What is the purpose of the change Make doc file ide_setup.md format better after e48abd38 ### Before: https://user-images.githubusercontent.com/159186/191633961-1cd7f135-34e5-4e5e-baf8-eba905499d89.png";> ### After: https://user-images.githubusercontent.com/159186/191633981-17e08c7e-7329-457b-aa1b-eabf7be2e086.png";> ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is a trivial rework / code cleanup without any test coverage. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20876: [hotfix][docs] Fix typo in Kinesis Connector docs.
flinkbot commented on PR #20876: URL: https://github.com/apache/flink/pull/20876#issuecomment-1254277324 ## CI report: * 417ffb9e3b582ebc1c5741d04876f1ab33ab4b61 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov opened a new pull request, #20876: [hotfix][docs] Fix typo in Kinesis Connector docs.
afedulov opened a new pull request, #20876: URL: https://github.com/apache/flink/pull/20876 This is a trivial docs fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607948#comment-17607948 ] Piotr Nowojski commented on FLINK-18647: [~dkapoor1], is there some other ticket for the {quote} Processing Time CEP is broken in minicluster {quote} that you are referring to? Apart of that, I would be afraid that even changing the behaviour of the minicluster to waiting for timers before shutdown would be problematic, prolonging the tests. Keep in mind that as a workaround in tests, you can keep alive your artificial source until some timer fires. > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be > handled by 1.? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #375: [FLINK-29327] remove operator config from job runtime config before d…
gyfora commented on code in PR #375: URL: https://github.com/apache/flink-kubernetes-operator/pull/375#discussion_r976892681 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -182,7 +184,13 @@ public JobID submitJobToSessionCluster( throws Exception { // we generate jobID in advance to help deduplicate job submission. var jobID = FlinkUtils.generateSessionJobFixedJobID(meta); -runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint); +Configuration runtimeConfig = removeOperatorConfigs(conf); +runJar( +spec.getJob(), +jobID, +uploadJar(meta, spec, runtimeConfig), +runtimeConfig, +savepoint); Review Comment: I looked at the code and I think for sessionJobs we should not remove any configs. When we submit the jar through the rest api the configuration is not passed. Removing operator configs here can actually break some artifact fetcher mechanisms within the operator here. We don't have very good test coverage it seems that would catch this problem. I will open some tickets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #375: [FLINK-29327] remove operator config from job runtime config before d…
HuangZhenQiu commented on code in PR #375: URL: https://github.com/apache/flink-kubernetes-operator/pull/375#discussion_r976868309 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -165,7 +165,8 @@ public void submitApplicationCluster( if (requireHaMetadata) { validateHaMetadataExists(conf); } -deployApplicationCluster(jobSpec, conf); + +deployApplicationCluster(jobSpec, removeOperatorConfigs(conf)); Review Comment: Thanks for the suggestions. Test cases are added in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #370: [FLINK-29288] Make it possible to use job jars in the system classpath
gyfora commented on code in PR #370: URL: https://github.com/apache/flink-kubernetes-operator/pull/370#discussion_r976840850 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java: ## @@ -87,4 +88,8 @@ public Boolean getAllowNonRestoredState() { } return null; } + +public Boolean isPipelineClasspathDefined() { +return flinkConfig.contains(PipelineOptions.CLASSPATHS); +} Review Comment: I agree that we should keep it, but did you look at the changes @usamj ? The questions is whether the change makes sense or not to support this feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29384) snakeyaml version 1.30 in flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar has vulnerabilities
James Busche created FLINK-29384: Summary: snakeyaml version 1.30 in flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar has vulnerabilities Key: FLINK-29384 URL: https://issues.apache.org/jira/browse/FLINK-29384 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.2.0 Reporter: James Busche I did a twistlock scan of the current operator image from main, and it looks good except for in the flink-kubernetes-operator-1.2-SNAPSHOT-shaded.jar, I'm seeing 5 CVEs on snakeyaml. Looks like updating from 1.30 to 1.32 should fix it, but I'm not sure how to bump that up, other than the [NOTICES|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE#L65] entry. The 5 CVEs are: [https://nvd.nist.gov/vuln/detail/CVE-2022-25857] [https://nvd.nist.gov/vuln/detail/CVE-2022-25857] [https://nvd.nist.gov/vuln/detail/CVE-2022-38751] [https://nvd.nist.gov/vuln/detail/CVE-2022-38750] [https://nvd.nist.gov/vuln/detail/CVE-2022-38752] Resulting in 1 High (CVSS 7.5) and 4 Mediums (CVSS 6.5, 6.5, 5.5, 4) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607914#comment-17607914 ] Divye Kapoor edited comment on FLINK-18647 at 9/21/22 5:48 PM: --- Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Tagging [~alibahadirzeybek] [~gaoyunhaii] and Karl for visibility. Thank you! was (Author: dkapoor1): Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Thank you! > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be > handled by 1.? -- This m
[jira] [Comment Edited] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607914#comment-17607914 ] Divye Kapoor edited comment on FLINK-18647 at 9/21/22 5:47 PM: --- Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Thank you! was (Author: dkapoor1): Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Looking forward to working closely with [~alibahadirzeybek] Karl and [~gaoyunhaii] to address (1). Thank you! > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue (via 2a, 2b or 2c), while UDFs could b
[jira] [Comment Edited] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607914#comment-17607914 ] Divye Kapoor edited comment on FLINK-18647 at 9/21/22 5:46 PM: --- Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Looking forward to working closely with [~alibahadirzeybek] Karl and [~gaoyunhaii] to address (1). Thank you! was (Author: dkapoor1): Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Looking forward to working closely with [~alibahadirzeybek] Karl and [~gaoyunhaii] to address (1). ([~pnowojski] - for context, this is a Pinterest support ticket to Ververica for (1) and that's why we're having a discussion here). Thank you! > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a qu
[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607914#comment-17607914 ] Divye Kapoor commented on FLINK-18647: -- Since this is an improvement ticket that covers scope outside of the immediate "bug" that Pinterest is concerned about (causing Minicluster job termination before timers are fired), maybe we can split the work items into 2 parts: 1. Fixing the bug on the minicluster in processing time (separate ticket). 2. The improvement to allow for different timer behavior for different usecases (this ticket). If that works for everyone and there are no objections to the change to fix (1), maybe we can move ahead with fixing (1) while we generate consensus on (2). FTR, we don't have any immediate known needs for per-operator early timer triggers on shutdown, so I'll reserve commenting on this ticket moving forward. Thanks for the interesting discussion [~pnowojski] . Looking forward to working closely with [~alibahadirzeybek] Karl and [~gaoyunhaii] to address (1). ([~pnowojski] - for context, this is a Pinterest support ticket to Ververica for (1) and that's why we're having a discussion here). Thank you! > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be > handled by 1.? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18647) How to handle processing time timers with bounded input
[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607907#comment-17607907 ] Divye Kapoor commented on FLINK-18647: -- [~pnowojski] – if the only implementation were "option 2" above – the "correct" behavior in both testing and production (wait for timers before triggering EOF), all of the usecases get resolved in the following manner: 1. mark end of some windowed aggregation – terminates in the normal manner. 2. CEP style timeouts - terminates in the normal manner. 3. handle async timeouts - terminates in the normal manner. 4. clean state - clean up in the normal manner. Re: 1-3: yes, waiting before EOF is inefficient in only 1 case – the testing case. In production, the wait is essential (because some other processing might be happening in the meanwhile outside the Flink job). Firing immediately is acceptable only in the testing case (otherwise it's a violation of guarantees). Re: 4 – > ideally should be dropped on EOF. Can be also fired or waited, but either of > those two is inefficient. When TTL is huge (hours, days or months) waiting > can be impractical. Maybe we don't agree here – in production, the wait is essential to retain processing time guarantees (savepoints on stop allow the job to restart and continue with the timer when the job restarts, so long timers are not troublesome). The only delta is testing – where the following case is not typical: "When TTL is huge (hours, days or months) waiting can be impractical." In essence, if we think about it a bit more: 1. Option 2: Doing the right thing works for both testing and production. The only delta from status quo is that in the minicluster, we wait for the timers to fire before shutting down the job. This is the minimal change to fix the identified bug (Processing Time CEP is broken in minicluster). All the other changes impact production and should be handled as a feature request. > How to handle processing time timers with bounded input > --- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue
[GitHub] [flink] HuangZhenQiu commented on a diff in pull request #20875: (FLINK-29363) allow fully redirection in web dashboard
HuangZhenQiu commented on code in PR #20875: URL: https://github.com/apache/flink/pull/20875#discussion_r976801184 ## flink-runtime-web/web-dashboard/src/app/app.interceptor.ts: ## @@ -39,6 +46,16 @@ export class AppInterceptor implements HttpInterceptor { return next.handle(req.clone({ withCredentials: true })).pipe( catchError(res => { +if ( + res instanceof HttpResponseBase && + (res.status == HttpStatusCode.MovedPermanently || +res.status == HttpStatusCode.TemporaryRedirect || +res.status == HttpStatusCode.SeeOther) && Review Comment: The code path is mainly to fetching job metadata. Multiple Choices, Use Proxy, Unused are not fit for the scenarios or data type. Bt I am open to add more status code to make it more robust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] usamj commented on a diff in pull request #370: [FLINK-29288] Make it possible to use job jars in the system classpath
usamj commented on code in PR #370: URL: https://github.com/apache/flink-kubernetes-operator/pull/370#discussion_r976786813 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java: ## @@ -87,4 +88,8 @@ public Boolean getAllowNonRestoredState() { } return null; } + +public Boolean isPipelineClasspathDefined() { +return flinkConfig.contains(PipelineOptions.CLASSPATHS); +} Review Comment: As you mentioned without the decorator the user would have to perform more work to use the Operator. I feel ease of use of the operator is important which is why I added this responsibility to the operator and believe that it should stay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-14896) Kinesis connector doesn't shade jackson dependency
[ https://issues.apache.org/jira/browse/FLINK-14896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-14896: -- Release Note: Shade and relocate transitive Jackson dependencies of {{flink-connector-kinesis}}. If your Flink job was transitively relying on the these, you may need to include additional Jackson dependencies into your project. > Kinesis connector doesn't shade jackson dependency > -- > > Key: FLINK-14896 > URL: https://issues.apache.org/jira/browse/FLINK-14896 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.0, 1.15.2 > Environment: AWS EMR 5.28.0 >Reporter: Michel Davit >Assignee: Danny Cranmer >Priority: Not a Priority > Labels: pull-request-available > Fix For: 1.17.0 > > Time Spent: 10m > Remaining Estimate: 0h > > flink-kinesis-connector depends on aws java sdk which is shaded to > {{org.apache.flink.kinesis.shaded.com.amazonaws.}} > > {{However, the aws sdk has a transitive dependency to jackson wich is not > shaded in the artifact.}} > > {{This creates problem when running flink on YARN: }}{{The aws sdk requires > jackson-core v2.6 but hadoop pulls in 2.3. See > [here|https://github.com/apache/flink/blob/e7c11ed672013512e5b159e7e892b27b1ef60a1b/flink-yarn/pom.xml#L133].}} > > {{If YARN uses the loads wrong jackson version from classpath. Jod fails > with}} > {code:java} > 2019-11-20 17:23:11,563 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled > exception.org.apache.flink.client.program.ProgramInvocationException: The > program caused an error: at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NoSuchMethodError: > com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper; > at > com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54) > at > com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30) > at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65) > at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53) > at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)at > com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256) > at > com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460) > at > com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424) > at > com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80) > ... > {code} > The flink-kinesis-connector should do as other connectors: shade jackson or > use the flink-shaded-jackson core dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-14896) Kinesis connector doesn't shade jackson dependency
[ https://issues.apache.org/jira/browse/FLINK-14896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-14896: -- Fix Version/s: 1.17.0 > Kinesis connector doesn't shade jackson dependency > -- > > Key: FLINK-14896 > URL: https://issues.apache.org/jira/browse/FLINK-14896 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.0, 1.15.2 > Environment: AWS EMR 5.28.0 >Reporter: Michel Davit >Assignee: Danny Cranmer >Priority: Not a Priority > Labels: pull-request-available > Fix For: 1.17.0 > > Time Spent: 10m > Remaining Estimate: 0h > > flink-kinesis-connector depends on aws java sdk which is shaded to > {{org.apache.flink.kinesis.shaded.com.amazonaws.}} > > {{However, the aws sdk has a transitive dependency to jackson wich is not > shaded in the artifact.}} > > {{This creates problem when running flink on YARN: }}{{The aws sdk requires > jackson-core v2.6 but hadoop pulls in 2.3. See > [here|https://github.com/apache/flink/blob/e7c11ed672013512e5b159e7e892b27b1ef60a1b/flink-yarn/pom.xml#L133].}} > > {{If YARN uses the loads wrong jackson version from classpath. Jod fails > with}} > {code:java} > 2019-11-20 17:23:11,563 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled > exception.org.apache.flink.client.program.ProgramInvocationException: The > program caused an error: at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NoSuchMethodError: > com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper; > at > com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54) > at > com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30) > at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65) > at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53) > at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)at > com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256) > at > com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460) > at > com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424) > at > com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80) > ... > {code} > The flink-kinesis-connector should do as other connectors: shade jackson or > use the flink-shaded-jackson core dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-14896) Kinesis connector doesn't shade jackson dependency
[ https://issues.apache.org/jira/browse/FLINK-14896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-14896: -- Affects Version/s: 1.15.2 > Kinesis connector doesn't shade jackson dependency > -- > > Key: FLINK-14896 > URL: https://issues.apache.org/jira/browse/FLINK-14896 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.0, 1.15.2 > Environment: AWS EMR 5.28.0 >Reporter: Michel Davit >Assignee: Danny Cranmer >Priority: Not a Priority > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > flink-kinesis-connector depends on aws java sdk which is shaded to > {{org.apache.flink.kinesis.shaded.com.amazonaws.}} > > {{However, the aws sdk has a transitive dependency to jackson wich is not > shaded in the artifact.}} > > {{This creates problem when running flink on YARN: }}{{The aws sdk requires > jackson-core v2.6 but hadoop pulls in 2.3. See > [here|https://github.com/apache/flink/blob/e7c11ed672013512e5b159e7e892b27b1ef60a1b/flink-yarn/pom.xml#L133].}} > > {{If YARN uses the loads wrong jackson version from classpath. Jod fails > with}} > {code:java} > 2019-11-20 17:23:11,563 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled > exception.org.apache.flink.client.program.ProgramInvocationException: The > program caused an error: at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NoSuchMethodError: > com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper; > at > com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54) > at > com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30) > at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65) > at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53) > at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)at > com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256) > at > com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460) > at > com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424) > at > com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80) > ... > {code} > The flink-kinesis-connector should do as other connectors: shade jackson or > use the flink-shaded-jackson core dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-14896) Kinesis connector doesn't shade jackson dependency
[ https://issues.apache.org/jira/browse/FLINK-14896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-14896: - Assignee: Danny Cranmer > Kinesis connector doesn't shade jackson dependency > -- > > Key: FLINK-14896 > URL: https://issues.apache.org/jira/browse/FLINK-14896 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.9.0 > Environment: AWS EMR 5.28.0 >Reporter: Michel Davit >Assignee: Danny Cranmer >Priority: Not a Priority > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > flink-kinesis-connector depends on aws java sdk which is shaded to > {{org.apache.flink.kinesis.shaded.com.amazonaws.}} > > {{However, the aws sdk has a transitive dependency to jackson wich is not > shaded in the artifact.}} > > {{This creates problem when running flink on YARN: }}{{The aws sdk requires > jackson-core v2.6 but hadoop pulls in 2.3. See > [here|https://github.com/apache/flink/blob/e7c11ed672013512e5b159e7e892b27b1ef60a1b/flink-yarn/pom.xml#L133].}} > > {{If YARN uses the loads wrong jackson version from classpath. Jod fails > with}} > {code:java} > 2019-11-20 17:23:11,563 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled > exception.org.apache.flink.client.program.ProgramInvocationException: The > program caused an error: at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NoSuchMethodError: > com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper; > at > com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54) > at > com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30) > at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65) > at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53) > at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)at > com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256) > at > com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460) > at > com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424) > at > com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80) > ... > {code} > The flink-kinesis-connector should do as other connectors: shade jackson or > use the flink-shaded-jackson core dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20875: (FLINK-29363) allow fully redirection in web dashboard
gaborgsomogyi commented on code in PR #20875: URL: https://github.com/apache/flink/pull/20875#discussion_r976740976 ## flink-runtime-web/web-dashboard/src/app/app.interceptor.ts: ## @@ -39,6 +46,16 @@ export class AppInterceptor implements HttpInterceptor { return next.handle(req.clone({ withCredentials: true })).pipe( catchError(res => { +if ( + res instanceof HttpResponseBase && + (res.status == HttpStatusCode.MovedPermanently || +res.status == HttpStatusCode.TemporaryRedirect || +res.status == HttpStatusCode.SeeOther) && Review Comment: I've taken a look at the RFC and it lists multiple 3xx codes. Just wondering how did you picked the listed codes? https://user-images.githubusercontent.com/18561820/191558022-abe65674-d545-4595-8cf4-6d6a1b06eabb.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens commented on a diff in pull request #370: [FLINK-29288] Make it possible to use job jars in the system classpath
sap1ens commented on code in PR #370: URL: https://github.com/apache/flink-kubernetes-operator/pull/370#discussion_r976723478 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java: ## @@ -87,4 +88,8 @@ public Boolean getAllowNonRestoredState() { } return null; } + +public Boolean isPipelineClasspathDefined() { Review Comment: Resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29363) Allow web ui to fully redirect to other page
[ https://issues.apache.org/jira/browse/FLINK-29363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607848#comment-17607848 ] Gabor Somogyi commented on FLINK-29363: --- I've had a deeper look at this issue and here are my findings. {quote}So the setup would be that there's an authenticating proxy between the Flink Web UI and Flink's REST API. The problem is currently that if a REST API call fails, the UI will just break, instead of redirecting to another page. {quote} Yes, this is the main issue. Adding redirect could be used for authentication renewal and/or routing to specific IP address based on user account. {quote}I guess based on the HTTP error codes? {quote} Yes, basically 401 comes back in case of authentication failure but adding that would be not super generic. I think it would be better to handle redirection which is not bound to auth failure. {quote}One problem I see is that this setting is purely used in the UI, so we need a way of forwarding a "global setting" to the UI ... but I guess that's solvable. {quote} Adding unconditional redirect handling wouldn't require any config. Of course we can introduce something if you think needed. > Allow web ui to fully redirect to other page > > > Key: FLINK-29363 > URL: https://issues.apache.org/jira/browse/FLINK-29363 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.15.2 >Reporter: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > In a streaming platform system, web ui usually integrates with internal > authentication and authorization system. Given the validation failed, the > request needs to be redirected to a landing page. It does't work for AJAX > request. It will be great to have the web ui configurable to allow auto full > redirect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607841#comment-17607841 ] Suhan Mao commented on FLINK-29370: --- [~libenchao] What about removing all com.google.protobuf files when packaging flink-sql-protobuf.jar and rely on user to put protobuf classes in the classpath? If user must provide compiled proto class, a jar with both compile proto class and google protobuf classes should also easy to provide. The java API of protobuf is relatively stable, so there should be little conflict with the current implementation. > Protobuf in flink-sql-protobuf is not shaded > > > Key: FLINK-29370 > URL: https://issues.apache.org/jira/browse/FLINK-29370 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Jark Wu >Priority: Blocker > Fix For: 1.16.0, 1.17.0 > > > The protobuf classes in flink-sql-protobuf is not shaded which may lead to > class conflicts. Usually, sql jars should shade common used dependencies, > e.g. flink-sql-avro: > https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88 > > {code} > ➜ Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google > 0 Tue Sep 13 20:23:44 CST 2022 com/google/ > 0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/ >568 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/ProtobufInternalUtils.class > 19218 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$Builder.class >259 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$BuilderParent.class > 10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class > 1486 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class > 12399 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder.class >279 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$InternalOneOfEnu > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29383) Add additionalPrinterColumns definition (PrinterColumn annotation) for some status fields
[ https://issues.apache.org/jira/browse/FLINK-29383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Hao updated FLINK-29383: Summary: Add additionalPrinterColumns definition (PrinterColumn annotation) for some status fields (was: Add additionalPrinterColumns definition for some status fields) > Add additionalPrinterColumns definition (PrinterColumn annotation) for some > status fields > - > > Key: FLINK-29383 > URL: https://issues.apache.org/jira/browse/FLINK-29383 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Xin Hao >Priority: Minor > > We should add additionalPrinterColumns definitions in the CRD so that we can > use > {code:java} > k get flinksessionjob -o wide > {code} > to see the session jobs statuses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29383) Add additionalPrinterColumns definition for some status fields
[ https://issues.apache.org/jira/browse/FLINK-29383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Hao updated FLINK-29383: Summary: Add additionalPrinterColumns definition for some status fields (was: Add additionalPrinterColumns for some status fields) > Add additionalPrinterColumns definition for some status fields > -- > > Key: FLINK-29383 > URL: https://issues.apache.org/jira/browse/FLINK-29383 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Xin Hao >Priority: Minor > > We should add additionalPrinterColumns definitions in the CRD so that we can > use > {code:java} > k get flinksessionjob -o wide > {code} > to see the session jobs statuses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29383) Add additionalPrinterColumns for some status fields
Xin Hao created FLINK-29383: --- Summary: Add additionalPrinterColumns for some status fields Key: FLINK-29383 URL: https://issues.apache.org/jira/browse/FLINK-29383 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Xin Hao We should add additionalPrinterColumns definitions in the CRD so that we can use {code:java} k get flinksessionjob -o wide {code} to see the session jobs statuses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] haoxins opened a new pull request, #378: Add PrinterColumn annotation for status fields
haoxins opened a new pull request, #378: URL: https://github.com/apache/flink-kubernetes-operator/pull/378 ## What is the purpose of the change *(For example: This pull request adds a new feature to periodically create and maintain savepoints through the `FlinkDeployment` custom resource.)* ## Brief change log *(for example:)* - *Periodic savepoint trigger is introduced to the custom resource* - *The operator checks on reconciliation whether the required time has passed* - *The JobManager's dispose savepoint API is used to clean up obsolete savepoints* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29363) Allow web ui to fully redirect to other page
[ https://issues.apache.org/jira/browse/FLINK-29363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607823#comment-17607823 ] Zhenqiu Huang commented on FLINK-29363: --- Yes, our scenarios is exactly the same as what [~rmetzger] explained. [~martijnvisser] Yes, we have auth proxy already globally. But the proxy server our team built is to limit the access of the job owner (who has already been authenticated) to the web ui of jobs running in k8 cluster. The proxy server runs in a k8 cluster as one of the control complane for all of flink jobs. The setting is required by our security team. Basically, AJAX request need to attache cookie for the access. If cookie expires, we need to a way to help users to redirect to our platform's landing page. > Allow web ui to fully redirect to other page > > > Key: FLINK-29363 > URL: https://issues.apache.org/jira/browse/FLINK-29363 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.15.2 >Reporter: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > In a streaming platform system, web ui usually integrates with internal > authentication and authorization system. Given the validation failed, the > request needs to be redirected to a landing page. It does't work for AJAX > request. It will be great to have the web ui configurable to allow auto full > redirect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20875: (FLINK-29363) allow fully redirection in web dashboard
flinkbot commented on PR #20875: URL: https://github.com/apache/flink/pull/20875#issuecomment-1253876906 ## CI report: * 4a8f9f9f850c33780f9f0d306cd8d3842b67658e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29363) Allow web ui to fully redirect to other page
[ https://issues.apache.org/jira/browse/FLINK-29363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29363: --- Labels: pull-request-available (was: ) > Allow web ui to fully redirect to other page > > > Key: FLINK-29363 > URL: https://issues.apache.org/jira/browse/FLINK-29363 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.15.2 >Reporter: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > In a streaming platform system, web ui usually integrates with internal > authentication and authorization system. Given the validation failed, the > request needs to be redirected to a landing page. It does't work for AJAX > request. It will be great to have the web ui configurable to allow auto full > redirect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] HuangZhenQiu opened a new pull request, #20875: (FLINK-29363) allow fully redirection in web dashboard
HuangZhenQiu opened a new pull request, #20875: URL: https://github.com/apache/flink/pull/20875 What is the purpose of the change allow fully redirection in web dashboard when response status code is MovedPermanently, TemporaryRedirect or SeeOther. Brief change log change the AppInterceptor to do redirection Verifying this change This change is a trivial rework / code cleanup without any test coverage. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with @Public(Evolving): (no) The serializers: (no) The runtime per-record code paths (performance sensitive): (no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29152) Describe statement resutls is different from the Hive
[ https://issues.apache.org/jira/browse/FLINK-29152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-29152. --- Fix Version/s: 1.16.0 1.17.0 Assignee: luoyuxia Resolution: Fixed Fixed in - master: b5cd9f34ab73fa69a3db5a09908c1aa954ed0597 - release-1.16: a6e954ca3bff9c62713d475627b49dd18a4f02fd > Describe statement resutls is different from the Hive > -- > > Key: FLINK-29152 > URL: https://issues.apache.org/jira/browse/FLINK-29152 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.17.0 > > > In hive, the results schema is > {code:java} > +---++--+ > | col_name | data_type | comment | > +---++--+ > | a | int| | > | b | string | | > +---++--+ > {code} > but our implementation is > {code:java} > 0: jdbc:hive2://localhost:1/default> describe sink; > +---+---+---+---+-++ > | name | type | null | key | extras | watermark | > +---+---+---+---+-++ > | a | INT | true | NULL | NULL| NULL | > +---+---+---+---+-++ > {code} > BTW, it's better we can support {{DESCRIBE FORMATTED}} like hive does. > {code:java} > +---++---+ > | col_name| data_type >|comment| > +---++---+ > | # col_name| data_type >| comment | > | | NULL >| NULL | > | a | int >| | > | b | string >| | > | | NULL >| NULL | > | # Detailed Table Information | NULL >| NULL | > | Database: | default >| NULL | > | Owner:| null >| NULL | > | CreateTime: | Tue Aug 30 06:54:00 UTC 2022 >| NULL | > | LastAccessTime: | UNKNOWN >| NULL | > | Retention:| 0 >| NULL | > | Location: | > hdfs://namenode:8020/user/hive/warehouse/sink | NULL | > | Table Type: | MANAGED_TABLE >| NULL | > | Table Parameters: | NULL >| NULL | > | | comment >| | > | | numFiles >| 0 | > | | totalSize >| 0 | > | | transient_lastDdlTime >| 1661842440| > | | NULL >| NULL | > | # Storage Information | NULL >| NULL | > | SerDe Library:| > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL | > | InputFormat: | org.apache.hadoop.mapred.TextInputFormat >| NULL | > | OutputFormat: | > org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL > | > | Compressed: | No >| NULL | > | Num Buckets: | -1 >| NULL | > | Bucket C
[GitHub] [flink] wuchong merged pull request #20789: [FLINK-29152][hive] fix inconsistent behavior with Hive for `desc table` in Hive dialect
wuchong merged PR #20789: URL: https://github.com/apache/flink/pull/20789 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29151) SHOW CREATE TABLE doesn't work for Hive dialect
[ https://issues.apache.org/jira/browse/FLINK-29151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-29151. --- Resolution: Fixed > SHOW CREATE TABLE doesn't work for Hive dialect > --- > > Key: FLINK-29151 > URL: https://issues.apache.org/jira/browse/FLINK-29151 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Assignee: luoyuxia >Priority: Major > Fix For: 1.16.0, 1.17.0 > > > {code:java} > 0: jdbc:hive2://localhost:1/default> show create table sink; > Error: org.apache.flink.table.gateway.service.utils.SqlExecutionException: > Failed to execute the operation 9b060771-34b8-453d-abf5-674c86b62921. > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:389) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:248) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException > at > org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.handleUnsupportedOperation(HiveParserDDLSemanticAnalyzer.java:2188) > at > org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.convertToOperation(HiveParserDDLSemanticAnalyzer.java:414) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:334) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:213) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:90) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$0(SqlGatewayServiceImpl.java:182) > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:111) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:239) > ... 7 more > Caused by: java.lang.UnsupportedOperationException: Unsupported operation: > TOK_SHOW_CREATETABLE > ... 15 more (state=,code=0) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29151) SHOW CREATE TABLE doesn't work for Hive dialect
[ https://issues.apache.org/jira/browse/FLINK-29151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607566#comment-17607566 ] Jark Wu edited comment on FLINK-29151 at 9/21/22 3:09 PM: -- Fixed in - master: 7ddf059d3b7b6888f550bfca9eb09c0cdeb7d682 - release-1.16: 8a5eec9945a3cdbd35b934508586803f470a3f2a was (Author: jark): Fixed in - master: 7ddf059d3b7b6888f550bfca9eb09c0cdeb7d682 - release-1.16: TODO > SHOW CREATE TABLE doesn't work for Hive dialect > --- > > Key: FLINK-29151 > URL: https://issues.apache.org/jira/browse/FLINK-29151 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Assignee: luoyuxia >Priority: Major > Fix For: 1.16.0, 1.17.0 > > > {code:java} > 0: jdbc:hive2://localhost:1/default> show create table sink; > Error: org.apache.flink.table.gateway.service.utils.SqlExecutionException: > Failed to execute the operation 9b060771-34b8-453d-abf5-674c86b62921. > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:389) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:248) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException > at > org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.handleUnsupportedOperation(HiveParserDDLSemanticAnalyzer.java:2188) > at > org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.convertToOperation(HiveParserDDLSemanticAnalyzer.java:414) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:334) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:213) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:90) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$0(SqlGatewayServiceImpl.java:182) > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:111) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:239) > ... 7 more > Caused by: java.lang.UnsupportedOperationException: Unsupported operation: > TOK_SHOW_CREATETABLE > ... 15 more (state=,code=0) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #313: [FLINK-27852][docs] OLM installation and development documentation
gyfora commented on PR #313: URL: https://github.com/apache/flink-kubernetes-operator/pull/313#issuecomment-1253848089 Yes @tedhtchang please do that. If you could write a small proposal of the benefits and what it would mean with regards to maintenance, release process etc compared to what we have now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28755) Error when switching from stateless to savepoint upgrade mode
[ https://issues.apache.org/jira/browse/FLINK-28755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-28755. -- Resolution: Fixed merged to main 1eca2c5e6a62bd9f1c9e752191f8a7477903d73c > Error when switching from stateless to savepoint upgrade mode > - > > Key: FLINK-28755 > URL: https://issues.apache.org/jira/browse/FLINK-28755 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0, kubernetes-operator-1.2.0 >Reporter: Gyula Fora >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > > When using the savepoint upgrade mode the state.savepoints.dir currently > comes from the currently deployed spec / config. > This causes a nullpointer exception when switching to savepoint upgrade mode > from stateless if state.savepoints.dir was previously undefined: > {noformat} > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:279) > org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:93) > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:172) > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:52) > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:108) > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:148) > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:56) > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:115){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20866: [BP-1.16][FLINK-29152][FLINK-29152][hive] Backport to release-1.16
wuchong merged PR #20866: URL: https://github.com/apache/flink/pull/20866 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #376: [FLINK-28755] Fix error when switching from stateless to savepoint upgrade mode
gyfora merged PR #376: URL: https://github.com/apache/flink-kubernetes-operator/pull/376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #20869: [FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client's execution
wuchong commented on PR #20869: URL: https://github.com/apache/flink/pull/20869#issuecomment-1253845113 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a diff in pull request #20653: [FLINK-29020][docs] add document for CTAS feature
wuchong commented on code in PR #20653: URL: https://github.com/apache/flink/pull/20653#discussion_r976322033 ## docs/content/docs/dev/table/sql/create.md: ## @@ -513,6 +516,48 @@ If you provide no like options, `INCLUDING ALL OVERWRITING OPTIONS` will be used **NOTE** The `source_table` can be a compound identifier. Thus, it can be a table from a different catalog or database: e.g. `my_catalog.my_db.MyTable` specifies table `MyTable` from catalog `MyCatalog` and database `my_db`; `my_db.MyTable` specifies table `MyTable` from current catalog and database `my_db`. +### `AS` Review Comment: ```suggestion ### `AS select_statement` ``` ## docs/content/docs/dev/table/sql/create.md: ## @@ -513,6 +516,48 @@ If you provide no like options, `INCLUDING ALL OVERWRITING OPTIONS` will be used **NOTE** The `source_table` can be a compound identifier. Thus, it can be a table from a different catalog or database: e.g. `my_catalog.my_db.MyTable` specifies table `MyTable` from catalog `MyCatalog` and database `my_db`; `my_db.MyTable` specifies table `MyTable` from current catalog and database `my_db`. +### `AS` + +Tables can also be created and populated by the results of a query in one create-table-as-select (CTAS) statement. CTAS is the simplest and fastest way to create and insert data into a table with a single command. + +There are two parts in CTAS, the SELECT part can be any [SELECT query]({{< ref "docs/dev/table/sql/queries/overview" >}}) supported by Flink SQL. The CREATE part takes the resulting schema from the SELECT part and creates the target table. Similar to `CREATE TABLE`, CTAS requires the required options of the target table must be specified in WITH clause. + +Creating the target table of CTAS depends on Catalog, so if using the built-in memory Catalog, users must ensure that the table already exists in external storage. If using other catalogs such as hive Catalog, the target table will be created by Catalog automatically. Review Comment: ```suggestion The creating table operation of CTAS depends on the target Catalog. For example, Hive Catalog creates the physical table in Hive automantically. But the in-memory catalog just register the table metadata in memory of the client where executing the SQL. ``` How about modifying like this? ## docs/content/docs/dev/table/sql/create.md: ## @@ -184,6 +184,9 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...] +: +The table is populated using the data from the select query. Review Comment: You can explain the meaning of select_query in the following section. A BNF notion of syntax should only contain syntax, not description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20805: [FLINK-29198][test] Fail after maximum RetryOnException
snuyanzin commented on PR #20805: URL: https://github.com/apache/flink/pull/20805#issuecomment-1253826144 Thanks for the contribution. I think it would be great to have a test here confirming that the change fixes the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29373) DataStream to table not support BigDecimalTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-29373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-29373: --- Affects Version/s: (was: 1.17.0) > DataStream to table not support BigDecimalTypeInfo > -- > > Key: FLINK-29373 > URL: https://issues.apache.org/jira/browse/FLINK-29373 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: hk__lrzy >Priority: Major > Attachments: image-2022-09-21-15-12-11-082.png > > > When we try to transfrom datastream to table, *TypeInfoDataTypeConverter* > will try to convert *TypeInformation* to {*}DataType{*}, but if datastream's > produce types contains {*}BigDecimalTypeInfo{*}, *TypeInfoDataTypeConverter* > will final convert it to {*}RawDataType{*},then when we want tranform table > to datastream again, exception will hapend, and show the data type not match. > Blink planner also will has this exception. > !image-2022-09-21-15-12-11-082.png! > > {code:java} > Caused by: org.apache.flink.table.codegen.CodeGenException: Incompatible > types of expression and result type. > Expression[GeneratedExpression(result$10,isNull$11,,Decimal(9,2),false)] type > is [Decimal(9,2)], result type is [GenericType] {code} > how to recurrent > {code:java} > // code placeholder > StreamExecutionEnvironment executionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance() > .useOldPlanner() > .inStreamingMode(); > StreamTableEnvironment streamTableEnvironment = > StreamTableEnvironment.create(executionEnvironment, envBuilder.build()); > FromElementsFunction fromElementsFunction = new FromElementsFunction(new > BigDecimal(1.11D)); > DataStreamSource dataStreamSource = > executionEnvironment.addSource(fromElementsFunction, new > BigDecimalTypeInfo(10, 8)); > streamTableEnvironment.createTemporaryView("tmp", dataStreamSource); > Table table = streamTableEnvironment.sqlQuery("select * from tmp"); > streamTableEnvironment.toRetractStream(table, table.getSchema().toRowType()); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29373) DataStream to table not support BigDecimalTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-29373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607791#comment-17607791 ] Martijn Visser commented on FLINK-29373: [~hk__lrzy] Which version of Flink are you using? There's no Blink planner in recent versions. Please make sure that your affected version is correct. > DataStream to table not support BigDecimalTypeInfo > -- > > Key: FLINK-29373 > URL: https://issues.apache.org/jira/browse/FLINK-29373 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: hk__lrzy >Priority: Major > Attachments: image-2022-09-21-15-12-11-082.png > > > When we try to transfrom datastream to table, *TypeInfoDataTypeConverter* > will try to convert *TypeInformation* to {*}DataType{*}, but if datastream's > produce types contains {*}BigDecimalTypeInfo{*}, *TypeInfoDataTypeConverter* > will final convert it to {*}RawDataType{*},then when we want tranform table > to datastream again, exception will hapend, and show the data type not match. > Blink planner also will has this exception. > !image-2022-09-21-15-12-11-082.png! > > {code:java} > Caused by: org.apache.flink.table.codegen.CodeGenException: Incompatible > types of expression and result type. > Expression[GeneratedExpression(result$10,isNull$11,,Decimal(9,2),false)] type > is [Decimal(9,2)], result type is [GenericType] {code} > how to recurrent > {code:java} > // code placeholder > StreamExecutionEnvironment executionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance() > .useOldPlanner() > .inStreamingMode(); > StreamTableEnvironment streamTableEnvironment = > StreamTableEnvironment.create(executionEnvironment, envBuilder.build()); > FromElementsFunction fromElementsFunction = new FromElementsFunction(new > BigDecimal(1.11D)); > DataStreamSource dataStreamSource = > executionEnvironment.addSource(fromElementsFunction, new > BigDecimalTypeInfo(10, 8)); > streamTableEnvironment.createTemporaryView("tmp", dataStreamSource); > Table table = streamTableEnvironment.sqlQuery("select * from tmp"); > streamTableEnvironment.toRetractStream(table, table.getSchema().toRowType()); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-26203) Support Table API in Pulsar Connector
[ https://issues.apache.org/jira/browse/FLINK-26203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-26203: -- Assignee: Yufei Zhang > Support Table API in Pulsar Connector > - > > Key: FLINK-26203 > URL: https://issues.apache.org/jira/browse/FLINK-26203 > Project: Flink > Issue Type: New Feature > Components: Connectors / Pulsar >Reporter: Yufei Zhang >Assignee: Yufei Zhang >Priority: Minor > Labels: Pulsar, auto-deprioritized-major > > Currently Pulsar connector only supports DataStream API. We plan to support > Table API as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26203) Support Table API in Pulsar Connector
[ https://issues.apache.org/jira/browse/FLINK-26203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607789#comment-17607789 ] Martijn Visser commented on FLINK-26203: [~syhily] Done. > Support Table API in Pulsar Connector > - > > Key: FLINK-26203 > URL: https://issues.apache.org/jira/browse/FLINK-26203 > Project: Flink > Issue Type: New Feature > Components: Connectors / Pulsar >Reporter: Yufei Zhang >Assignee: Yufei Zhang >Priority: Minor > Labels: Pulsar, auto-deprioritized-major > > Currently Pulsar connector only supports DataStream API. We plan to support > Table API as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #377: [FLINK-28979] add owner reference to flink deployment object
gyfora commented on PR #377: URL: https://github.com/apache/flink-kubernetes-operator/pull/377#issuecomment-1253794569 Also since now we support standalone integration we should probably add the owner reference for both JM and TM deployments there. That should be part of the PR + some tests :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29278) BINARY type is not supported in table store
[ https://issues.apache.org/jira/browse/FLINK-29278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-29278. Assignee: Nicholas Jiang Resolution: Fixed master: 422ea6072b251b04041b6ca8a738316a30069aad release-0.2: 3eee4bf4eddd6d22a0225e1958601a44f7dd9ba7 > BINARY type is not supported in table store > --- > > Key: FLINK-29278 > URL: https://issues.apache.org/jira/browse/FLINK-29278 > Project: Flink > Issue Type: Bug > Components: Table Store >Reporter: Jingsong Lee >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0, table-store-0.2.1 > > Attachments: image-2022-09-13-15-21-55-116.png > > > !image-2022-09-13-15-21-55-116.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29375) Move getSelfGateway into RpcService
[ https://issues.apache.org/jira/browse/FLINK-29375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29375: --- Labels: pull-request-available (was: ) > Move getSelfGateway into RpcService > --- > > Key: FLINK-29375 > URL: https://issues.apache.org/jira/browse/FLINK-29375 > Project: Flink > Issue Type: Sub-task > Components: Runtime / RPC >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Self gateways are a tricky thing and we should give the RPC implementation > control over how they are achieved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #20870: [FLINK-29375][rpc] Move getSelfGateway() into RpcService
XComp commented on code in PR #20870: URL: https://github.com/apache/flink/pull/20870#discussion_r976555879 ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java: ## @@ -197,6 +197,20 @@ public int getPort() { return port; } +public C getSelfGateway(Class selfGatewayType, RpcServer rpcServer) { +if (selfGatewayType.isInstance(rpcServer)) { +@SuppressWarnings("unchecked") +C selfGateway = ((C) rpcServer); + +return selfGateway; +} else { +throw new RuntimeException( Review Comment: I'm wondering whether `ClassCastException` is the better exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29382) Flink fails to start when created using quick guide for flink operator
[ https://issues.apache.org/jira/browse/FLINK-29382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607770#comment-17607770 ] Barisa commented on FLINK-29382: You are right, I'm not running on the minikube, I'm using Kubernetes > Flink fails to start when created using quick guide for flink operator > -- > > Key: FLINK-29382 > URL: https://issues.apache.org/jira/browse/FLINK-29382 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Barisa >Priority: Major > > I followed > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/] > to deploy flink operator and then the flink job. > > > When following step > {{kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.1/examples/basic.yaml}} > the pod starts, but then it keeps crashing with following exception. > > {noformat} > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: pods is > forbidden: User "system:anonymous" cannot watch resource "pods" in API group > "" in the namespace "zonda" > at > io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onFailure(WatcherWebSocketListener.java:74) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onResponse(RealWebSocket.java:199) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:174) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) > ~[flink-dist-1.15.2.jar:1.15.2] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > {noformat} > I also noticed following log lines > {noformat} > 2022-09-21 13:32:05,715 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > 2022-09-21 13:32:05,719 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > {noformat} > I think the problem is that container runs as user root, which later uses > gosu to became flink user. However, service account is only accessible to the > main user in the container, which is root > {noformat} > root@basic-example-658578895d-qwlb2:/opt/flink# ls -hltr > /var/run/secrets/kubernetes.io/serviceaccount/token > lrwxrwxrwx. 1 root 1337 12 Sep 21 08:57 > /var/run/secrets/kubernetes.io/serviceaccount/token -> ..data/token > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #377: [FLINK-28979] add owner reference to flink deployment object
gyfora commented on PR #377: URL: https://github.com/apache/flink-kubernetes-operator/pull/377#issuecomment-1253749120 What I don't understand is how this would interact with the CR deletion process. Currently when the user deletes a Flink CR the operator itself cleans up the dependent resources (deployment, HA configmaps). What would happen if the owner reference is set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] zezaeoh commented on pull request #377: [FLINK-28979] add owner reference to flink deployment object
zezaeoh commented on PR #377: URL: https://github.com/apache/flink-kubernetes-operator/pull/377#issuecomment-1253728010 Hi there, thanks for comments! We can find out what an owner reference is, and what it can do in kubernetes in the following docs: * https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/ * https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#OwnerReference What i understand is there are no other effects when `blockOwnerDeletion` is set to `false` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29382) Flink fails to start when created using quick guide for flink operator
[ https://issues.apache.org/jira/browse/FLINK-29382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-29382: --- Affects Version/s: kubernetes-operator-1.1.0 (was: 1.15.2) > Flink fails to start when created using quick guide for flink operator > -- > > Key: FLINK-29382 > URL: https://issues.apache.org/jira/browse/FLINK-29382 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Barisa >Priority: Major > > I followed > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/] > to deploy flink operator and then the flink job. > > > When following step > {{kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.1/examples/basic.yaml}} > the pod starts, but then it keeps crashing with following exception. > > {noformat} > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: pods is > forbidden: User "system:anonymous" cannot watch resource "pods" in API group > "" in the namespace "zonda" > at > io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onFailure(WatcherWebSocketListener.java:74) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onResponse(RealWebSocket.java:199) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:174) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) > ~[flink-dist-1.15.2.jar:1.15.2] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > {noformat} > I also noticed following log lines > {noformat} > 2022-09-21 13:32:05,715 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > 2022-09-21 13:32:05,719 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > {noformat} > I think the problem is that container runs as user root, which later uses > gosu to became flink user. However, service account is only accessible to the > main user in the container, which is root > {noformat} > root@basic-example-658578895d-qwlb2:/opt/flink# ls -hltr > /var/run/secrets/kubernetes.io/serviceaccount/token > lrwxrwxrwx. 1 root 1337 12 Sep 21 08:57 > /var/run/secrets/kubernetes.io/serviceaccount/token -> ..data/token > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29382) Flink fails to start when created using quick guide for flink operator
[ https://issues.apache.org/jira/browse/FLINK-29382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607757#comment-17607757 ] Gyula Fora commented on FLINK-29382: What environment are you running in? the quickstart is for minikube > Flink fails to start when created using quick guide for flink operator > -- > > Key: FLINK-29382 > URL: https://issues.apache.org/jira/browse/FLINK-29382 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.15.2 >Reporter: Barisa >Priority: Major > > I followed > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/] > to deploy flink operator and then the flink job. > > > When following step > {{kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.1/examples/basic.yaml}} > the pod starts, but then it keeps crashing with following exception. > > {noformat} > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: pods is > forbidden: User "system:anonymous" cannot watch resource "pods" in API group > "" in the namespace "zonda" > at > io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onFailure(WatcherWebSocketListener.java:74) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onResponse(RealWebSocket.java:199) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:174) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) > ~[flink-dist-1.15.2.jar:1.15.2] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > {noformat} > I also noticed following log lines > {noformat} > 2022-09-21 13:32:05,715 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > 2022-09-21 13:32:05,719 WARN io.fabric8.kubernetes.client.Config > [] - Error reading service account token from: > [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring. > {noformat} > I think the problem is that container runs as user root, which later uses > gosu to became flink user. However, service account is only accessible to the > main user in the container, which is root > {noformat} > root@basic-example-658578895d-qwlb2:/opt/flink# ls -hltr > /var/run/secrets/kubernetes.io/serviceaccount/token > lrwxrwxrwx. 1 root 1337 12 Sep 21 08:57 > /var/run/secrets/kubernetes.io/serviceaccount/token -> ..data/token > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)